Node.js 設(shè)計模式筆記 —— 消息中間件及其應用模式(任務分發(fā))

Distributing tasks to a set of consumers

將高成本的任務委派給多個工作節(jié)點,這種類型的應用并不適合由 Pub/Sub 模式實現(xiàn)。因為我們并不想同一個任務被多個消費者收到锦针,相反我們更需要一種類似負載均衡的消息分發(fā)模式它浅。在消息系統(tǒng)術(shù)語中译柏,也被稱為 competing consumersfanout distributionventilator姐霍。
與 HTTP 負載均衡器不同的是鄙麦,任務分發(fā)系統(tǒng)中的消費者是一種更活躍的角色典唇。絕大多數(shù)時候都是消費者連接到任務隊列,請求新的任務胯府。這一點在可擴展系統(tǒng)中非常關(guān)鍵介衔,允許我們在不修改生產(chǎn)者部分的情況下,直接平滑地增加工作節(jié)點的數(shù)量骂因。
此外炎咖,在一個通用的消息系統(tǒng)中,我們沒有必要強調(diào)生產(chǎn)者和消費者之間的請求/響應通信寒波。多數(shù)情況下乘盼,更優(yōu)先的選擇是使用單向的異步通信,從而獲得更優(yōu)異的并行能力和擴展性俄烁。消息基本上總是沿著一個方向流動绸栅,這樣的管道允許我們構(gòu)建復雜的信息處理架構(gòu),又不必承受同步通信帶來的開銷页屠。

A messaging pipeline

ZeroMQ Fanout/Fanin 模式

分布式 hashsum 破解器

需要以下組件實現(xiàn)一個標準的并行管線:

  • 一個協(xié)調(diào)節(jié)點負責在多個工作節(jié)點間分發(fā)任務
  • 多個工作節(jié)點承擔具體的計算任務
  • 一個用于收集計算結(jié)果的節(jié)點
The architecture of a typical pipeline with ZeroMQ

即一個節(jié)點負責生成所有可能的字符串組合阴幌,并將它們分發(fā)給不同的工作節(jié)點;工作節(jié)點則負責計算接收到的字符串卷中,比較 hash 值阱表;最后一個節(jié)點負責收集暴力破解的結(jié)果。

實現(xiàn) producer

為了表示所有可能的字符組合陋气,這里使用 N 維索引樹溉跃。每個節(jié)點包含一個當前位置下可能出現(xiàn)的字母,比如只有 a十减、b 兩個字母的話栈幸,長度為 3 的字符串組合共有圖示的以下幾種:

Indexed n-ary tree for alphabet (a, b)

indexed-string-variation 包可以幫助我們由索引計算出對應的字符串,這項工作可以在工作節(jié)點完成帮辟,因此 producer 這里只需要將分好組的索引值分發(fā)給工作節(jié)點速址。
generateTasks.js:

export function* generateTasks(searchHash, alphabet,
    maxWordLength, batchSize) {
    let nVariations = 0
    for (let n = 1; n <= maxWordLength; n++) {
        nVariations += Math.pow(alphabet.length, n)
    }

    console.log('Finding the hashsum source string over ' +
        `${nVariations} possible variations`)

    let batchStart = 1
    while (batchStart <= nVariations) {
        const batchEnd = Math.min(
            batchStart + batchSize - 1, nVariations)
        yield {
            searchHash,
            alphabet: alphabet,
            batchStart,
            batchEnd
        }

        batchStart = batchEnd + 1
    }
}

producer.js:

import zmq from 'zeromq'
import delay from 'delay'
import { generateTasks } from './generateTasks.js'

const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000

const [, , maxLength, searchHash] = process.argv

async function main() {
    const ventilator = new zmq.Push()
    await ventilator.bind('tcp://*:5016')
    await delay(1000)

    const generatorObj = generateTasks(searchHash, ALPHABET, maxLength, BATCH_SIZE)
    for (const task of generatorObj) {
        await ventilator.send(JSON.stringify(task))
    }
}

main().catch(err => console.log(err))
  • 創(chuàng)建一個 PUSH socket 并綁定給本地的 5016 端口,工作節(jié)點的 PULL socket 會連接到此端口并接收任務
  • 將每一個生成的任務字符串化由驹,通過 PUSH socket 的 send() 方法發(fā)送給工作節(jié)點芍锚。工作節(jié)點以輪詢的方式接收不同的任務
實現(xiàn) worker

process Task.js:

import isv from 'indexed-string-variation'
import { createHash } from 'crypto'

export function processTask(task) {
    const variationGen = isv.generator(task.alphabet)
    console.log('processing from ' +
        `${variationGen(task.batchStart)} (${task.batchStart})` +
        `to ${variationGen(task.batchEnd)} (${task.batchEnd}`)

    for (let idx = task.batchStart; idx <= task.batchEnd; idx++) {
        const word = variationGen(idx)
        const shasum = createHash('sha1')
        shasum.update(word)
        const digest = shasum.digest('hex')

        if (digest === task.searchHash) {
            return word
        }
    }
}

processTask() 遍歷給定區(qū)間內(nèi)的所有索引值,對每一個索引生成對應的字符串蔓榄,再計算其 SHA1 值并炮,與傳入的 task 對象中的 searchHash 比較。

worker.js:

import zmq from 'zeromq'
import { processTask } from './processTask.js'

async function main() {
    const fromVentilator = new zmq.Pull()
    const toSink = new zmq.Push()

    fromVentilator.connect('tcp://localhost:5016')
    toSink.connect('tcp://localhost:5017')

    for await (const rawMessage of fromVentilator) {
        const found = processTask(JSON.parse(rawMessage.toString()))
        if (found) {
            console.log(`Found! => ${found}`)
            await toSink.send(`Found: $found`)
        }
    }
}

main().catch(err => console.error(err))

worker.js 創(chuàng)建了兩個 socket甥郑。PULL socket 負責連接到任務發(fā)布方(Ventilator)逃魄,接收任務;PUSH socket 負責連接到結(jié)果收集方(sink)澜搅,傳遞任務執(zhí)行的結(jié)果伍俘。

實現(xiàn) results collector

collector.js:

import zmq from 'zeromq'

async function main() {
    const sink = new zmq.Pull()
    await sink.bind('tcp://*:5017')

    for await (const rawMessage of sink) {
        console.log('Message from worker: ', rawMessage.toString())
    }
}

main().catch(err => console.error(err))

運行以下命令測試結(jié)果:

node worker.js
node worker.js
node collector.js
node producer.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b

AMQP 實現(xiàn) pipeline 和 competing consumers

Task distribution architecture using a message queue broker

像前面那樣在點對點的模式下邪锌,實現(xiàn) pipeline 是非常直觀的。假設(shè)我們需要借助 AMQP 這類系統(tǒng)實現(xiàn)任務分配模式癌瘾,就必須確保每條消息都只會被一個消費者接收到秃流。
可以直接將任務發(fā)布到目標 queue,不經(jīng)過 exchange柳弄。避免了 exchange 有可能綁定了多個 queue 的情況舶胀。之后,多個消費者同時監(jiān)聽這一個 queue碧注,消息即會以 fanout 的方式均勻地分發(fā)給所有的消費者嚣伐。

hashsum 破解器的 AMQP 實現(xiàn)

producer-amqp.js:

import amqp from 'amqplib'
import { generateTasks } from './generateTasks.js'

const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000

const [, , maxLength, searchHash] = process.argv

async function main() {
    const connection = await amqp.connect('amqp://localhost')
    const channel = await connection.createConfirmChannel()
    await channel.assertQueue('tasks_queue')

    const generatorObj = generateTasks(searchHash, ALPHABET,
        maxLength, BATCH_SIZE)
    for (const task of generatorObj) {
        channel.sendToQueue('tasks_queue', Buffer.from(JSON.stringify(task)))
    }

    await channel.waitForConfirms()
    channel.close()
    connection.close()
}

main().catch(err => console.error(err))
  • 此處創(chuàng)建的是一個 confirmChannel,它提供了一個 waitForConfirms() 函數(shù)萍丐,可以在 broker 確認收到消息前等待轩端,確保應用不會過早地關(guān)閉到 broker 的連接
  • channel.sendToQueue() 負責將一條消息直接發(fā)送給某個 queue,跳過任何 exchange 或者路由

worker-amqp.js:

import amqp from 'amqplib'
import { processTask } from './processTask.js'

async function main() {
    const connection = await amqp.connect('amqp://localhost')
    const channel = await connection.createChannel()
    const { queue } = await channel.assertQueue('tasks_queue')
    channel.consume(queue, async (rawMessage) => {
        const found = processTask(
            JSON.parse(rawMessage.content.toString()))
        if (found) {
            console.log(`Found! => ${found}`)
            await channel.sendToQueue('results_queue',
                Buffer.from(`Found: ${found}`))
        }
        await channel.ack(rawMessage)
    })
}

main().catch(err => console.error(err))

collector-amqp.js:

import amqp from 'amqplib'

async function main() {
    const connection = await amqp.connect('amqp://localhost')
    const channel = await connection.createChannel()
    const { queue } = await channel.assertQueue('results_queue')
    channel.consume(queue, msg => {
        console.log(`Message from worker: ${msg.content.toString()}`)
    })
}

main().catch(err => console.error(err))

運行如下命令測試效果:

node worker-amqp.js
node worker-amqp.js
node collector-amqp.js
node producer-amqp.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b

通過 Redis Streams 實現(xiàn)任務分發(fā)

Redis Stream 可以借助一種叫做 consumer groups 的特性實現(xiàn)任務分發(fā)模式逝变。Consumer group 是一個有狀態(tài)的實體基茵,由一組名稱標識的消費者組成,組中的消費者會以 round-robin 的方式接收記錄壳影。
每條記錄都必須被顯式地確認拱层,否則該記錄會一直處于 pending 狀態(tài)。每個消費者都只能訪問它自己的 pending 記錄宴咧,假如消費者突然崩潰根灯,在其回到線上后會先嘗試獲取其 pending 的記錄。

A Redis Stream consumer group

Consumer group 也會記錄其讀取的上一條消息的 ID掺栅,因而在連續(xù)的讀取操作中烙肺,consumer group 知道下一條要讀取的記錄時是哪個。

producer-redis.js:

import Redis from 'ioredis'
import { generateTasks } from './generateTasks.js'

const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000
const redisClient = new Redis()

const [, , maxLength, searchHash] = process.argv

async function main() {
    const generatorObj = generateTasks(searchHash, ALPHABET,
        maxLength, BATCH_SIZE)
    for (const task of generatorObj) {
        await redisClient.xadd('tasks_stream', '*',
            'task', JSON.stringify(task))
    }

    redisClient.disconnect()
}

main().catch(err => console.error(err))

worker-redis.js:

import Redis from 'ioredis'
import { processTask } from './processTask.js'

const redisClient = new Redis()
const [, , consumerName] = process.argv

async function main() {
    await redisClient.xgroup('CREATE', 'tasks_stream',
        'workers_group', '$', 'MKSTREAM')
        .catch(() => console.log('Consumer group already exists'))

    const [[, records]] = await redisClient.xreadgroup(
        'GROUP', 'workers_group', consumerName, 'STREAMS',
        'tasks_stream', '0')
    for (const [recordId, [, rawTask]] of records) {
        await processAndAck(recordId, rawTask)
    }

    while (true) {
        const [[, records]] = await redisClient.xreadgroup(
            'GROUP', 'workers_group', consumerName, 'BLOCK', '0',
            'COUNT', '1', 'STREAMS', 'tasks_stream', '>')
        for (const [recordId, [, rawTask]] of records) {
            await processAndAck(recordId, rawTask)
        }
    }
}

async function processAndAck(recordId, rawTask) {
    const found = processTask(JSON.parse(rawTask))
    if (found) {
        console.log(`Found! => ${found}`)
        await redisClient.xadd('results_stream', '*', 'result',
            `Found: ${found}`)
    }

    await redisClient.xack('tasks_stream', 'workers_group', recordId)
}

main().catch(err => console.error(err))
  • xgroup 命令用來確保 consumer group 存在氧卧。
    • CREATE 表示我們希望創(chuàng)建一個 consumer group
    • tasks_stream 表示我們想要讀取的 stream 的名字
    • workers_group 是 consumer group 的名字
    • 第四個參數(shù)表示 consumer group 開始讀取的記錄的位置桃笙。$ 表示當前 stream 中最后一條記錄的 ID
    • MKSTREAM 表示如果 stream 不存在則創(chuàng)建它
  • 通過 xreadgroup 命令讀取屬于當前 consumer 的所有 pending 的記錄。
    • 'GROUP'沙绝、'workers_group'搏明、consumerName 用來指代 consumer group 和 consumer 的名字
    • STREAMStasks_stream 用來指代我們想要讀取的 stream 的名字
    • 0 用來表示我們想要開始讀取的記錄的位置。這里表示從屬于當前 consumer 的第一條記錄開始宿饱,讀取所有 pending 的消息
  • 通過另外一條 xreadgroup 命令讀取 stream 里新增加的記錄熏瞄。
    • 'BLOCK''0' 兩個參數(shù)表示如果沒有新的消息脚祟,就一直阻塞等待谬以。'0' 具體表示一直等待永不超時
    • 'COUNT''1' 表示一次請求只獲取一條記錄
    • 特殊 ID > 表示只獲取還沒有被當前的 consumer group 處理過的消息
  • processAndAck() 函數(shù)負責當 xreadgroup() 返回的記錄被處理完成時,調(diào)用 xack 命令進行確認由桌,將該記錄從當前 consumer 的 pending 列表里移除

collector-redis.js:

import Redis from 'ioredis'

const redisClient = new Redis()

async function main() {
    let lastRecordId = '$'
    while (true) {
        const data = await redisClient.xread(
            'BLOCK', '0', 'STREAMS', 'results_stream', lastRecordId)
        for (const [, logs] of data) {
            for (const [recordId, [, message]] of logs) {
                console.log(`Message from worker: ${message}`)
                lastRecordId = recordId
            }
        }
    }
}

main().catch(err => console.error(err))

運行程序測試效果:

node worker-redis.js workerA
node worker-redis.js workerB
node collector-redis.js
node producer-redis.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b

參考資料

Node.js Design Patterns: Design and implement production-grade Node.js applications using proven patterns and techniques, 3rd Edition

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末为黎,一起剝皮案震驚了整個濱河市邮丰,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌铭乾,老刑警劉巖剪廉,帶你破解...
    沈念sama閱讀 221,198評論 6 514
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異炕檩,居然都是意外死亡斗蒋,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,334評論 3 398
  • 文/潘曉璐 我一進店門笛质,熙熙樓的掌柜王于貴愁眉苦臉地迎上來泉沾,“玉大人,你說我怎么就攤上這事妇押□尉浚” “怎么了?”我有些...
    開封第一講書人閱讀 167,643評論 0 360
  • 文/不壞的土叔 我叫張陵敲霍,是天一觀的道長俊马。 經(jīng)常有香客問我,道長肩杈,這世上最難降的妖魔是什么柴我? 我笑而不...
    開封第一講書人閱讀 59,495評論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮扩然,結(jié)果婚禮上屯换,老公的妹妹穿的比我還像新娘。我一直安慰自己与学,他們只是感情好彤悔,可當我...
    茶點故事閱讀 68,502評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著索守,像睡著了一般晕窑。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上卵佛,一...
    開封第一講書人閱讀 52,156評論 1 308
  • 那天杨赤,我揣著相機與錄音,去河邊找鬼截汪。 笑死疾牲,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的衙解。 我是一名探鬼主播阳柔,決...
    沈念sama閱讀 40,743評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼蚓峦!你這毒婦竟也來了舌剂?” 一聲冷哼從身側(cè)響起济锄,我...
    開封第一講書人閱讀 39,659評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎霍转,沒想到半個月后荐绝,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,200評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡避消,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,282評論 3 340
  • 正文 我和宋清朗相戀三年低滩,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片岩喷。...
    茶點故事閱讀 40,424評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡委造,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出均驶,到底是詐尸還是另有隱情昏兆,我是刑警寧澤,帶...
    沈念sama閱讀 36,107評論 5 349
  • 正文 年R本政府宣布妇穴,位于F島的核電站爬虱,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏腾它。R本人自食惡果不足惜跑筝,卻給世界環(huán)境...
    茶點故事閱讀 41,789評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望瞒滴。 院中可真熱鬧曲梗,春花似錦、人聲如沸妓忍。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,264評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽世剖。三九已至定罢,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間旁瘫,已是汗流浹背祖凫。 一陣腳步聲響...
    開封第一講書人閱讀 33,390評論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留酬凳,地道東北人惠况。 一個月前我還...
    沈念sama閱讀 48,798評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像宁仔,于是被迫代替她去往敵國和親稠屠。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,435評論 2 359

推薦閱讀更多精彩內(nèi)容