將高成本的任務委派給多個工作節(jié)點,這種類型的應用并不適合由 Pub/Sub 模式實現(xiàn)。因為我們并不想同一個任務被多個消費者收到锦针,相反我們更需要一種類似負載均衡的消息分發(fā)模式它浅。在消息系統(tǒng)術(shù)語中译柏,也被稱為 competing consumers,fanout distribution 或 ventilator姐霍。
與 HTTP 負載均衡器不同的是鄙麦,任務分發(fā)系統(tǒng)中的消費者是一種更活躍的角色典唇。絕大多數(shù)時候都是消費者連接到任務隊列,請求新的任務胯府。這一點在可擴展系統(tǒng)中非常關(guān)鍵介衔,允許我們在不修改生產(chǎn)者部分的情況下,直接平滑地增加工作節(jié)點的數(shù)量骂因。
此外炎咖,在一個通用的消息系統(tǒng)中,我們沒有必要強調(diào)生產(chǎn)者和消費者之間的請求/響應通信寒波。多數(shù)情況下乘盼,更優(yōu)先的選擇是使用單向的異步通信,從而獲得更優(yōu)異的并行能力和擴展性俄烁。消息基本上總是沿著一個方向流動绸栅,這樣的管道允許我們構(gòu)建復雜的信息處理架構(gòu),又不必承受同步通信帶來的開銷页屠。
ZeroMQ Fanout/Fanin 模式
分布式 hashsum 破解器
需要以下組件實現(xiàn)一個標準的并行管線:
- 一個協(xié)調(diào)節(jié)點負責在多個工作節(jié)點間分發(fā)任務
- 多個工作節(jié)點承擔具體的計算任務
- 一個用于收集計算結(jié)果的節(jié)點
即一個節(jié)點負責生成所有可能的字符串組合阴幌,并將它們分發(fā)給不同的工作節(jié)點;工作節(jié)點則負責計算接收到的字符串卷中,比較 hash 值阱表;最后一個節(jié)點負責收集暴力破解的結(jié)果。
實現(xiàn) producer
為了表示所有可能的字符組合陋气,這里使用 N 維索引樹溉跃。每個節(jié)點包含一個當前位置下可能出現(xiàn)的字母,比如只有 a
十减、b
兩個字母的話栈幸,長度為 3 的字符串組合共有圖示的以下幾種:
indexed-string-variation
包可以幫助我們由索引計算出對應的字符串,這項工作可以在工作節(jié)點完成帮辟,因此 producer 這里只需要將分好組的索引值分發(fā)給工作節(jié)點速址。
generateTasks.js:
export function* generateTasks(searchHash, alphabet,
maxWordLength, batchSize) {
let nVariations = 0
for (let n = 1; n <= maxWordLength; n++) {
nVariations += Math.pow(alphabet.length, n)
}
console.log('Finding the hashsum source string over ' +
`${nVariations} possible variations`)
let batchStart = 1
while (batchStart <= nVariations) {
const batchEnd = Math.min(
batchStart + batchSize - 1, nVariations)
yield {
searchHash,
alphabet: alphabet,
batchStart,
batchEnd
}
batchStart = batchEnd + 1
}
}
producer.js:
import zmq from 'zeromq'
import delay from 'delay'
import { generateTasks } from './generateTasks.js'
const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000
const [, , maxLength, searchHash] = process.argv
async function main() {
const ventilator = new zmq.Push()
await ventilator.bind('tcp://*:5016')
await delay(1000)
const generatorObj = generateTasks(searchHash, ALPHABET, maxLength, BATCH_SIZE)
for (const task of generatorObj) {
await ventilator.send(JSON.stringify(task))
}
}
main().catch(err => console.log(err))
- 創(chuàng)建一個 PUSH socket 并綁定給本地的 5016 端口,工作節(jié)點的 PULL socket 會連接到此端口并接收任務
- 將每一個生成的任務字符串化由驹,通過 PUSH socket 的
send()
方法發(fā)送給工作節(jié)點芍锚。工作節(jié)點以輪詢的方式接收不同的任務
實現(xiàn) worker
process Task.js:
import isv from 'indexed-string-variation'
import { createHash } from 'crypto'
export function processTask(task) {
const variationGen = isv.generator(task.alphabet)
console.log('processing from ' +
`${variationGen(task.batchStart)} (${task.batchStart})` +
`to ${variationGen(task.batchEnd)} (${task.batchEnd}`)
for (let idx = task.batchStart; idx <= task.batchEnd; idx++) {
const word = variationGen(idx)
const shasum = createHash('sha1')
shasum.update(word)
const digest = shasum.digest('hex')
if (digest === task.searchHash) {
return word
}
}
}
processTask()
遍歷給定區(qū)間內(nèi)的所有索引值,對每一個索引生成對應的字符串蔓榄,再計算其 SHA1 值并炮,與傳入的 task
對象中的 searchHash
比較。
worker.js:
import zmq from 'zeromq'
import { processTask } from './processTask.js'
async function main() {
const fromVentilator = new zmq.Pull()
const toSink = new zmq.Push()
fromVentilator.connect('tcp://localhost:5016')
toSink.connect('tcp://localhost:5017')
for await (const rawMessage of fromVentilator) {
const found = processTask(JSON.parse(rawMessage.toString()))
if (found) {
console.log(`Found! => ${found}`)
await toSink.send(`Found: $found`)
}
}
}
main().catch(err => console.error(err))
worker.js
創(chuàng)建了兩個 socket甥郑。PULL socket 負責連接到任務發(fā)布方(Ventilator)逃魄,接收任務;PUSH socket 負責連接到結(jié)果收集方(sink)澜搅,傳遞任務執(zhí)行的結(jié)果伍俘。
實現(xiàn) results collector
collector.js:
import zmq from 'zeromq'
async function main() {
const sink = new zmq.Pull()
await sink.bind('tcp://*:5017')
for await (const rawMessage of sink) {
console.log('Message from worker: ', rawMessage.toString())
}
}
main().catch(err => console.error(err))
運行以下命令測試結(jié)果:
node worker.js
node worker.js
node collector.js
node producer.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b
AMQP 實現(xiàn) pipeline 和 competing consumers
像前面那樣在點對點的模式下邪锌,實現(xiàn) pipeline 是非常直觀的。假設(shè)我們需要借助 AMQP 這類系統(tǒng)實現(xiàn)任務分配模式癌瘾,就必須確保每條消息都只會被一個消費者接收到秃流。
可以直接將任務發(fā)布到目標 queue,不經(jīng)過 exchange柳弄。避免了 exchange 有可能綁定了多個 queue 的情況舶胀。之后,多個消費者同時監(jiān)聽這一個 queue碧注,消息即會以 fanout 的方式均勻地分發(fā)給所有的消費者嚣伐。
hashsum 破解器的 AMQP 實現(xiàn)
producer-amqp.js:
import amqp from 'amqplib'
import { generateTasks } from './generateTasks.js'
const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000
const [, , maxLength, searchHash] = process.argv
async function main() {
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createConfirmChannel()
await channel.assertQueue('tasks_queue')
const generatorObj = generateTasks(searchHash, ALPHABET,
maxLength, BATCH_SIZE)
for (const task of generatorObj) {
channel.sendToQueue('tasks_queue', Buffer.from(JSON.stringify(task)))
}
await channel.waitForConfirms()
channel.close()
connection.close()
}
main().catch(err => console.error(err))
- 此處創(chuàng)建的是一個
confirmChannel
,它提供了一個waitForConfirms()
函數(shù)萍丐,可以在 broker 確認收到消息前等待轩端,確保應用不會過早地關(guān)閉到 broker 的連接 -
channel.sendToQueue()
負責將一條消息直接發(fā)送給某個 queue,跳過任何 exchange 或者路由
worker-amqp.js:
import amqp from 'amqplib'
import { processTask } from './processTask.js'
async function main() {
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
const { queue } = await channel.assertQueue('tasks_queue')
channel.consume(queue, async (rawMessage) => {
const found = processTask(
JSON.parse(rawMessage.content.toString()))
if (found) {
console.log(`Found! => ${found}`)
await channel.sendToQueue('results_queue',
Buffer.from(`Found: ${found}`))
}
await channel.ack(rawMessage)
})
}
main().catch(err => console.error(err))
collector-amqp.js:
import amqp from 'amqplib'
async function main() {
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
const { queue } = await channel.assertQueue('results_queue')
channel.consume(queue, msg => {
console.log(`Message from worker: ${msg.content.toString()}`)
})
}
main().catch(err => console.error(err))
運行如下命令測試效果:
node worker-amqp.js
node worker-amqp.js
node collector-amqp.js
node producer-amqp.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b
通過 Redis Streams 實現(xiàn)任務分發(fā)
Redis Stream 可以借助一種叫做 consumer groups 的特性實現(xiàn)任務分發(fā)模式逝变。Consumer group 是一個有狀態(tài)的實體基茵,由一組名稱標識的消費者組成,組中的消費者會以 round-robin 的方式接收記錄壳影。
每條記錄都必須被顯式地確認拱层,否則該記錄會一直處于 pending 狀態(tài)。每個消費者都只能訪問它自己的 pending 記錄宴咧,假如消費者突然崩潰根灯,在其回到線上后會先嘗試獲取其 pending 的記錄。
Consumer group 也會記錄其讀取的上一條消息的 ID掺栅,因而在連續(xù)的讀取操作中烙肺,consumer group 知道下一條要讀取的記錄時是哪個。
producer-redis.js:
import Redis from 'ioredis'
import { generateTasks } from './generateTasks.js'
const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000
const redisClient = new Redis()
const [, , maxLength, searchHash] = process.argv
async function main() {
const generatorObj = generateTasks(searchHash, ALPHABET,
maxLength, BATCH_SIZE)
for (const task of generatorObj) {
await redisClient.xadd('tasks_stream', '*',
'task', JSON.stringify(task))
}
redisClient.disconnect()
}
main().catch(err => console.error(err))
worker-redis.js:
import Redis from 'ioredis'
import { processTask } from './processTask.js'
const redisClient = new Redis()
const [, , consumerName] = process.argv
async function main() {
await redisClient.xgroup('CREATE', 'tasks_stream',
'workers_group', '$', 'MKSTREAM')
.catch(() => console.log('Consumer group already exists'))
const [[, records]] = await redisClient.xreadgroup(
'GROUP', 'workers_group', consumerName, 'STREAMS',
'tasks_stream', '0')
for (const [recordId, [, rawTask]] of records) {
await processAndAck(recordId, rawTask)
}
while (true) {
const [[, records]] = await redisClient.xreadgroup(
'GROUP', 'workers_group', consumerName, 'BLOCK', '0',
'COUNT', '1', 'STREAMS', 'tasks_stream', '>')
for (const [recordId, [, rawTask]] of records) {
await processAndAck(recordId, rawTask)
}
}
}
async function processAndAck(recordId, rawTask) {
const found = processTask(JSON.parse(rawTask))
if (found) {
console.log(`Found! => ${found}`)
await redisClient.xadd('results_stream', '*', 'result',
`Found: ${found}`)
}
await redisClient.xack('tasks_stream', 'workers_group', recordId)
}
main().catch(err => console.error(err))
-
xgroup
命令用來確保 consumer group 存在氧卧。-
CREATE
表示我們希望創(chuàng)建一個 consumer group -
tasks_stream
表示我們想要讀取的 stream 的名字 -
workers_group
是 consumer group 的名字 - 第四個參數(shù)表示 consumer group 開始讀取的記錄的位置桃笙。
$
表示當前 stream 中最后一條記錄的 ID -
MKSTREAM
表示如果 stream 不存在則創(chuàng)建它
-
- 通過
xreadgroup
命令讀取屬于當前 consumer 的所有 pending 的記錄。-
'GROUP'
沙绝、'workers_group'
搏明、consumerName
用來指代 consumer group 和 consumer 的名字 -
STREAMS
和tasks_stream
用來指代我們想要讀取的 stream 的名字 -
0
用來表示我們想要開始讀取的記錄的位置。這里表示從屬于當前 consumer 的第一條記錄開始宿饱,讀取所有 pending 的消息
-
- 通過另外一條
xreadgroup
命令讀取 stream 里新增加的記錄熏瞄。-
'BLOCK'
和'0'
兩個參數(shù)表示如果沒有新的消息脚祟,就一直阻塞等待谬以。'0'
具體表示一直等待永不超時 -
'COUNT'
和'1'
表示一次請求只獲取一條記錄 - 特殊 ID
>
表示只獲取還沒有被當前的 consumer group 處理過的消息
-
-
processAndAck()
函數(shù)負責當xreadgroup()
返回的記錄被處理完成時,調(diào)用xack
命令進行確認由桌,將該記錄從當前 consumer 的 pending 列表里移除
collector-redis.js:
import Redis from 'ioredis'
const redisClient = new Redis()
async function main() {
let lastRecordId = '$'
while (true) {
const data = await redisClient.xread(
'BLOCK', '0', 'STREAMS', 'results_stream', lastRecordId)
for (const [, logs] of data) {
for (const [recordId, [, message]] of logs) {
console.log(`Message from worker: ${message}`)
lastRecordId = recordId
}
}
}
}
main().catch(err => console.error(err))
運行程序測試效果:
node worker-redis.js workerA
node worker-redis.js workerB
node collector-redis.js
node producer-redis.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b