主要有兩類技術(shù)可以用來(lái)整合分布式應(yīng)用:一類是通過(guò)共享存儲(chǔ)作為一個(gè)中心化的協(xié)調(diào)者鞠鲜,跟蹤和保存所有需要共享的信息;另一類則是通過(guò)消息中間件断国,向系統(tǒng)中的所有節(jié)點(diǎn)散布數(shù)據(jù)贤姆、事件和命令等。
消息存在于軟件系統(tǒng)的各個(gè)層級(jí)稳衬。我們通過(guò)互聯(lián)網(wǎng)交換消息完成通信霞捡;通過(guò)管道發(fā)送消息給其他進(jìn)程;設(shè)備驅(qū)動(dòng)通過(guò)消息與硬件進(jìn)行交互等等薄疚。任何用于在組件和系統(tǒng)之間交換信息的離散或結(jié)構(gòu)化數(shù)據(jù)都可以視為消息碧信。
消息系統(tǒng)基礎(chǔ)
對(duì)于消息系統(tǒng),有以下四個(gè)基本要素需要考慮:
- 通訊的方向街夭∨椴辏可以是單向的,也可以是“請(qǐng)求 - 響應(yīng)”模式
- 通訊的目的板丽。同時(shí)決定了消息本身的內(nèi)容
- 消息的時(shí)效性呈枉。可以同步或者異步地發(fā)送與接收
- 消息的投遞方式埃碱〔瑁可以直接投遞也可以通過(guò)某個(gè)中間件
單向 vs “請(qǐng)求 - 應(yīng)答”模式
單向模式:消息從源頭推送到目的地。常見(jiàn)的應(yīng)用比如郵件系統(tǒng)乃正、將工作任務(wù)分派給一系列工作節(jié)點(diǎn)的系統(tǒng)住册。
“請(qǐng)求 - 響應(yīng)”模式:一方發(fā)出的消息總能夠與對(duì)方發(fā)出的消息匹配。比如 web 服務(wù)的調(diào)用瓮具、向數(shù)據(jù)庫(kù)請(qǐng)求數(shù)據(jù)等荧飞。
包含多個(gè)響應(yīng)節(jié)點(diǎn)的“請(qǐng)求 - 響應(yīng)”模式:
消息類型
消息內(nèi)容主要取決于通信的目的凡人。通常有以下三種:
- 命令消息
- 事件消息
- 文檔消息
命令消息用來(lái)令接收者觸發(fā)某個(gè)動(dòng)作或者任務(wù)。借助它可以實(shí)現(xiàn)遠(yuǎn)程過(guò)程調(diào)用(RPC)系統(tǒng)叹阔,分布式計(jì)算等挠轴。RESTful HTTP 請(qǐng)求就是簡(jiǎn)單的命令消息的例子。
事件消息用來(lái)通知另一個(gè)組件發(fā)生了某些情況耳幢。事件在分布式系統(tǒng)中是一種很重要的整合機(jī)制岸晦,用來(lái)確保系統(tǒng)的各個(gè)組件保持同樣的步調(diào)。
文檔消息基本上就是在組件之間傳輸數(shù)據(jù)睛藻。比如數(shù)據(jù)庫(kù)請(qǐng)求的結(jié)果启上。
異步隊(duì)列和流
同步通信類似于打電話。電話的雙方必須同時(shí)在線店印,連接到同一個(gè)通道冈在,實(shí)時(shí)地交流信息。當(dāng)我們需要打給另一個(gè)人時(shí)按摘,通常就得搞一部新的手機(jī)或者掛掉當(dāng)前正在進(jìn)行的通話包券,撥打新的號(hào)碼。
異步通信類似于發(fā)短信炫贤。我們發(fā)送短信的時(shí)刻溅固,并不需要接收方已經(jīng)接入了網(wǎng)絡(luò)。我們可以一條接一條地發(fā)送多條短信給不同的人兰珍,以任意順序接收對(duì)方的回復(fù)(如果有的話)侍郭。
另一個(gè)異步通信的重要特性就是,消息可以被臨時(shí)存儲(chǔ)在某個(gè)地方俩垃,再在之后的某個(gè)時(shí)間送達(dá)励幼。當(dāng)接收方非常忙碌無(wú)法處理新的消息,或者我們需要確保投遞的成功率時(shí)口柳,這個(gè)特性就非常有用了苹粟。
消息隊(duì)列就是這樣一種在生產(chǎn)者和消費(fèi)者之間存儲(chǔ)消息的中間組件。若消費(fèi)者因?yàn)槟撤N原因崩潰跃闹、斷開(kāi)連接等嵌削,消息會(huì)在隊(duì)列中累積,待消費(fèi)者重新上線時(shí)立即進(jìn)行分發(fā)望艺。
另外一種類似的數(shù)據(jù)結(jié)構(gòu)是 log苛秕。log 是一種只能追加的結(jié)構(gòu),它是持久的找默,其消息可以在到達(dá)時(shí)被讀取艇劫,也可以通過(guò)訪問(wèn)其歷史記錄來(lái)獲取。在消息系統(tǒng)中惩激,也常被叫做 stream店煞。
不同于隊(duì)列蟹演,在 stream 中,消息被消費(fèi)后不會(huì)被移除顷蟀,意味著 stream 在消息的獲取方面有著更高的自由度酒请。隊(duì)列通常一次只暴露一條消息給消費(fèi)者,而一個(gè) stream 能夠被多個(gè)消費(fèi)者共享(甚至是同一份消息)鸣个。
消息隊(duì)列:
流:
點(diǎn)對(duì)點(diǎn) vs 消息中間件
“發(fā)布 - 訂閱” 模式
就是一種分布式的觀察者模式羞反。
一個(gè)最小化的實(shí)時(shí)聊天應(yīng)用
package.json:
{
"type": "module",
"dependencies": {
"amqplib": "^0.10.3",
"ioredis": "^5.2.4",
"JSONStream": "^1.3.5",
"level": "^8.0.0",
"leveldown": "^6.1.1",
"levelup": "^5.1.1",
"monotonic-timestamp": "^0.0.9",
"serve-handler": "^6.1.5",
"superagent": "^8.0.6",
"ws": "^8.11.0",
"yargs": "^17.6.2",
"zeromq": "^6.0.0-beta.16"
}
}
index.js:
import ws, { WebSocketServer } from 'ws'
import { createServer } from 'http'
import staticHandler from 'serve-handler'
const server = createServer((req, res) => {
return staticHandler(req, res, { public: 'www' })
})
const wss = new WebSocketServer({ server })
wss.on('connection', client => {
console.log('Client connected')
client.on('message', msg => {
console.log(`Message: ${msg}`)
broadcast(`${msg}`)
})
})
function broadcast(msg) {
for (const client of wss.clients) {
if (client.readyState == ws.OPEN) {
client.send(msg)
}
}
}
server.listen(process.argv[2] || 8000)
- 首先創(chuàng)建一個(gè) HTTP 服務(wù),將所有請(qǐng)求轉(zhuǎn)發(fā)給一個(gè)特別的 handler(
staticHandler
)囤萤,該 handler 負(fù)責(zé) serve 所有的靜態(tài)文件 - 創(chuàng)建一個(gè) WebSocket 服務(wù)實(shí)例昼窗,綁定到 HTTP 服務(wù)拖陆。同時(shí)監(jiān)聽(tīng)來(lái)自 WebSocket 客戶端的連接請(qǐng)求,以及客戶端發(fā)送的消息
- 當(dāng)某個(gè)客戶端發(fā)送的新消息到達(dá)時(shí)匣沼,通過(guò)
broadcast()
函數(shù)將消息廣播給所有的客戶端
www/index.html:
<!DOCTYPE html>
<html>
<body>
Messages:
<div id="messages"></div>
<form id="msgForm">
<input type="text" placeholder="Send a message" id="msgBox"/>
<input type="submit" value="Send"/>
</form>
<script>
const ws = new WebSocket(
`ws://${window.document.location.host}`
)
ws.onmessage = function (message) {
const msgDiv = document.createElement('div')
msgDiv.innerHTML = message.data
document.getElementById('messages').appendChild(msgDiv)
}
const form = document.getElementById('msgForm')
form.addEventListener('submit', (event) => {
event.preventDefault()
const message = document.getElementById('msgBox').value
ws.send(message)
document.getElementById('msgBox').value = ''
})
</script>
</body>
</html>
通過(guò) node index.js 8002
命令運(yùn)行應(yīng)用壮虫,打開(kāi)兩個(gè)瀏覽器頁(yè)面訪問(wèn) Web 服務(wù),測(cè)試聊天效果:
但我們的應(yīng)用是無(wú)法進(jìn)行橫向擴(kuò)展的荤崇。比如再啟動(dòng)一個(gè)新的服務(wù)實(shí)例 node index.js 8003
,此時(shí)連接到 8002 的客戶端無(wú)法與連接到 8003 的客戶端通信〈盗瘢可以自行測(cè)試。
使用 Redis 作為消息中間件
架構(gòu)圖如下所示滚婉。每個(gè)服務(wù)實(shí)例都會(huì)把從客戶端收到的消息發(fā)布到消息中間件图筹,同時(shí)也會(huì)通過(guò)中間件訂閱從其他服務(wù)實(shí)例發(fā)布的消息。
- 通過(guò)客戶端網(wǎng)頁(yè)發(fā)送的消息傳遞給對(duì)應(yīng)的 chat server
- chat server 把收到的消息發(fā)布到 Redis
- Redis 將收到的消息分發(fā)給所有的訂閱方(chat server)
- chat server 將收到的消息再分發(fā)給所有連接的客戶端
index-redis.js:
import ws, { WebSocketServer } from 'ws'
import { createServer } from 'http'
import staticHandler from 'serve-handler'
import Redis from 'ioredis'
const redisSub = new Redis()
const redisPub = new Redis()
const server = createServer((req, res) => {
return staticHandler(req, res, { public: 'www' })
})
const wss = new WebSocketServer({ server })
wss.on('connection', client => {
console.log('Client connected')
client.on('message', msg => {
console.log(`Message: ${msg}`)
redisPub.publish('chat_message', `${msg}`)
})
})
redisSub.subscribe('chat_message')
redisSub.on('message', (channel, msg) => {
for (const client of wss.clients) {
if (client.readyState === ws.OPEN) {
client.send(msg)
}
}
})
server.listen(process.argv[2] || 8000)
運(yùn)行 node index-redis.js 8002
让腹、node index-redis.js 8003
兩條命令啟動(dòng)兩個(gè)服務(wù)實(shí)例远剩,此時(shí)連接到不同服務(wù)器的客戶端相互之間也能夠進(jìn)行通信。
點(diǎn)對(duì)點(diǎn) Pub/Sub 模式
通過(guò) ZeroMQ 創(chuàng)建兩種類型的 socket:PUB
和 SUB
骇窍。PUB socket 綁定到本地機(jī)器的某個(gè)端口瓜晤,負(fù)責(zé)監(jiān)聽(tīng)來(lái)自其他機(jī)器上 SUB socket 的訂閱請(qǐng)求。當(dāng)一條消息通過(guò) PUB socket 發(fā)送時(shí)腹纳,該消息會(huì)被廣播到所有連接的 SUB socket痢掠。
index-zeromq.js:
import { createServer } from 'http'
import staticHandler from 'serve-handler'
import ws, { WebSocketServer } from 'ws'
import yargs from 'yargs'
import zmq from 'zeromq'
const server = createServer((req, res) => {
return staticHandler(req, res, { public: 'www' })
})
let pubSocket
async function initializeSockets() {
pubSocket = new zmq.Publisher()
await pubSocket.bind(`tcp://127.0.0.1:${yargs(process.argv).argv.pub}`)
const subSocket = new zmq.Subscriber()
const subPorts = [].concat(yargs(process.argv).argv.sub)
for (const port of subPorts) {
console.log(`Subscribing to ${port}`)
subSocket.connect(`tcp://127.0.0.1:${port}`)
}
subSocket.subscribe('chat')
for await (const [msg] of subSocket) {
console.log(`Message from another server: ${msg}`)
broadcast(msg.toString().split(' ')[1])
}
}
initializeSockets()
const wss = new WebSocketServer({ server })
wss.on('connection', client => {
console.log('Client connected')
client.on('message', msg => {
console.log(`Message: ${msg}`)
broadcast(`${msg}`)
pubSocket.send(`chat ${msg}`)
})
})
function broadcast(msg) {
for (const client of wss.clients) {
if (client.readyState === ws.OPEN) {
client.send(msg)
}
}
}
server.listen(yargs(process.argv).argv.http || 8000)
- 通過(guò)
yargs
模塊解析命令行參數(shù) - 通過(guò)
initializeSocket()
函數(shù)創(chuàng)建 Publisher,并綁定到由--pub
命令行參數(shù)提供的端口上 - 創(chuàng)建 Subscriber socket 并將其連接到其他應(yīng)用實(shí)例的 Publisher socket嘲恍。被連接的 Publisher 端口由
--sub
命令行參數(shù)提供足画。之后創(chuàng)建以chat
為過(guò)濾器的訂閱,即只接收以chat
開(kāi)頭的消息 - 通過(guò)
for
循環(huán)監(jiān)聽(tīng)到達(dá) Subscriber 的消息佃牛,去除消息中的chat
前綴淹辞,通過(guò)broadcast()
函數(shù)將處理后的消息廣播給所有連接的客戶端 - 當(dāng)有消息到達(dá)當(dāng)前實(shí)例的 WebSocket 服務(wù)時(shí),廣播此消息到所有客戶端俘侠,同時(shí)通過(guò) Publisher 發(fā)布該消息
運(yùn)行服務(wù)測(cè)試效果:
node index-zeromq.js --http 8002 --pub 5000 --sub 5001 --sub 5002
node index-zeromq.js --http 8003 --pub 5001 --sub 5000 --sub 5002
node index-zeromq.js --http 8004 --pub 5002 --sub 5000 --sub 5001
通過(guò)隊(duì)列實(shí)現(xiàn)可靠的消息投遞
消息隊(duì)列是消息系統(tǒng)中的一種重要抽象象缀。借助消息隊(duì)列蔬将,通信中的發(fā)送方和接收方不必同時(shí)處于活躍的連接狀態(tài)。隊(duì)列系統(tǒng)會(huì)負(fù)責(zé)存儲(chǔ)未投遞的消息攻冷,直到目標(biāo)處于能夠接收的狀態(tài)娃胆。
消息系統(tǒng)的投遞機(jī)制可以簡(jiǎn)單概況為以下 3 類:
- 最多一次:fire-and-forget。消息不會(huì)被持久化等曼,投遞狀態(tài)也不會(huì)被確認(rèn)里烦。意味著在接收者崩潰或者斷開(kāi)連接時(shí),消息有可能丟失
- 最少一次:消息會(huì)確保至少被收到一次禁谦。但是重復(fù)收取同一條消息的情況有可能出現(xiàn)胁黑,比如接收者在收到消息后突然崩潰,沒(méi)有來(lái)得及告知發(fā)送者消息已經(jīng)收到州泊。
- 只有一次:這是最可靠的投遞機(jī)制丧蘸,保證消息只會(huì)被接收一次。但由于需要更復(fù)雜的確認(rèn)機(jī)制遥皂,會(huì)犧牲一部分消息投遞的效率力喷。
當(dāng)消息投遞機(jī)制可以實(shí)現(xiàn)“最少一次”或者“只有一次”時(shí),我們就有了 durable subscriber演训。
AMQP
AMQP 是一個(gè)被很多消息系統(tǒng)支持的開(kāi)放標(biāo)準(zhǔn)協(xié)議弟孟。除了定義一個(gè)通用的傳輸協(xié)議以外,他還提供了用于描述 routing样悟、filtering拂募、queuing、reliability 和 security 的模型窟她。
- Queue:用于存儲(chǔ)消息的數(shù)據(jù)結(jié)構(gòu)陈症。假如多個(gè)消費(fèi)者綁定了同一個(gè)隊(duì)列,消息在它們之間是負(fù)載均衡的震糖。隊(duì)列可以是以下任意一種類型:
- Durable:當(dāng)中間件重啟時(shí)隊(duì)列會(huì)自動(dòng)重建录肯。但這并不意味著其內(nèi)容也會(huì)被保留。實(shí)際上只有標(biāo)記為持久化消息的內(nèi)容才會(huì)被保存到磁盤试伙,并在重啟時(shí)恢復(fù)
- Exclusive:隊(duì)列只綁定給唯一一個(gè)特定的訂閱者嘁信,當(dāng)連接關(guān)閉時(shí),隊(duì)列即被銷毀
- Auto-delete:當(dāng)最后一個(gè)訂閱者斷開(kāi)連接時(shí)疏叨,隊(duì)列被刪除
- Exchange:消息發(fā)布的地方潘靖。Exchange 會(huì)將消息路由至一個(gè)或者多個(gè) queue。路由規(guī)則取決于具體的實(shí)現(xiàn):
- Direct exchange:通過(guò)完整匹配一個(gè) routing key 來(lái)對(duì)消息進(jìn)行路由(如
chat.msg
) - Topic exchange:對(duì) routing key 進(jìn)行模糊匹配(如
chat.#
匹配所有以chat
開(kāi)頭的 key) - Fanout exchange:將消息廣播至所有連接的 queue蚤蔓,忽略提供的任何 routing key
- Direct exchange:通過(guò)完整匹配一個(gè) routing key 來(lái)對(duì)消息進(jìn)行路由(如
- Binding:Exchange 和 queue 之間的鏈接卦溢,定義了用于過(guò)濾消息的 routing key 或模式
上述所有組件由中間件進(jìn)行維護(hù),同時(shí)對(duì)外暴露用于創(chuàng)建和維護(hù)的 API。當(dāng)連接到某個(gè)中間件時(shí)单寂,客戶端會(huì)創(chuàng)建一個(gè) channel 對(duì)象負(fù)責(zé)維護(hù)通信的狀態(tài)贬芥。
AMQP 和 RabbitMQ 實(shí)現(xiàn) durable subscriber
chat 應(yīng)用和消息歷史記錄服務(wù)的架構(gòu)圖:
AMQP 和數(shù)據(jù)庫(kù)實(shí)現(xiàn) history service
此模塊由兩部分組成:一個(gè) HTTP 服務(wù)負(fù)責(zé)將聊天歷史記錄暴露給客戶端;一個(gè) AMQP 消費(fèi)者負(fù)責(zé)獲取聊天消息并將它們保存在本地?cái)?shù)據(jù)庫(kù)中宣决。
historySvc.js:
import { createServer } from 'http'
import levelup from 'levelup'
import leveldown from 'leveldown'
import timestamp from 'monotonic-timestamp'
import JSONStream from 'JSONStream'
import amqp from 'amqplib'
async function main() {
const db = levelup(leveldown('./msgHistory'))
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
await channel.assertExchange('chat', 'fanout')
const { queue } = channel.assertQueue('chat_history')
await channel.bindQueue(queue, 'chat')
channel.consume(queue, async msg => {
const content = msg.content.toString()
console.log(`Saving message: ${content}`)
await db.put(timestamp(), content)
channel.ack(msg)
})
createServer((req, res) => {
res.writeHead(200)
db.createValueStream()
.pipe(JSONStream.stringify())
.pipe(res)
}).listen(8090)
}
main().catch(err => console.error(err))
- 創(chuàng)建一個(gè)到 AMQP 中間件的連接
- 設(shè)置一個(gè)名為
chat
的 fanout 模式的 exchange蘸劈。assertExchange()
函數(shù)會(huì)確保相應(yīng)的 exchange 存在,否則就創(chuàng)建 - 創(chuàng)建一個(gè)名為
chat_history
的 queue尊沸,綁定給上一步中創(chuàng)建的 exchange - 開(kāi)始監(jiān)聽(tīng)來(lái)自 queue 的消息威沫,將收到的每一條消息保存至 LevelDB 數(shù)據(jù)庫(kù),以時(shí)間戳作為鍵洼专。消息保存成功后由
channel.ack(msg)
進(jìn)行確認(rèn)棒掠。若確認(rèn)動(dòng)作未被中間件收到,則該條消息會(huì)保留在隊(duì)列中再次被處理
index-amqp.js
import { createServer } from 'http'
import staticHandler from 'serve-handler'
import ws, { WebSocketServer } from 'ws'
import amqp from 'amqplib'
import JSONStream from 'JSONStream'
import superagent from 'superagent'
const httpPort = process.argv[2] || 8000
async function main() {
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
await channel.assertExchange('chat', 'fanout')
const { queue } = await channel.assertQueue(
`chat_srv_${httpPort}`,
{ exclusive: true })
await channel.bindQueue(queue, 'chat')
channel.consume(queue, msg => {
msg = msg.content.toString()
console.log(`From queue: ${msg}`)
broadcast(msg)
}, { noAck: true })
const server = createServer((req, res) => {
return staticHandler(req, res, { public: 'www' })
})
const wss = new WebSocketServer({ server })
wss.on('connection', client => {
console.log('Client connected')
client.on('message', msg => {
console.log(`Message: ${msg}`)
channel.publish('chat', '', Buffer.from(msg))
})
superagent
.get('http://localhost:8090')
.on('error', err => console.log(err))
.pipe(JSONStream.parse('*'))
.on('data', msg => {
client.send(Buffer(msg).toString())
})
})
function broadcast(msg) {
for (const client of wss.clients) {
if (client.readyState === ws.OPEN) {
client.send(msg)
}
}
}
server.listen(httpPort)
}
main().catch(err => console.log(err))
- 我們的聊天服務(wù)沒(méi)必要是 durable subscriber屁商,fire-and-forget 機(jī)制就足夠了烟很,因而有
{ exclusive: true }
選項(xiàng) - 確認(rèn)機(jī)制也是不需要的。
{ noAck: true }
- 發(fā)布消息也很簡(jiǎn)單蜡镶,只需要指定目標(biāo) exchange(
chat
)和一個(gè) routing key 即可雾袱,這里我們使用的是 fanout exchange,不需要路由官还,routing key 為空 - 發(fā)布到 exchange 的消息被轉(zhuǎn)發(fā)到所有綁定的 queue谜酒,再到達(dá)所有訂閱了 queue 的服務(wù)實(shí)例,每個(gè)實(shí)例再將消息發(fā)送到所有連接的客戶端
- 通過(guò)
superagent
請(qǐng)求 history 微服務(wù)妻枕,將獲取到的所有歷史消息發(fā)送給剛連接的客戶端
運(yùn)行服務(wù)測(cè)試效果:
node index-amqp.js 8002
node index-amqp.js 8003
node historySvc.js
通過(guò) streams 實(shí)現(xiàn)可靠的消息投遞
在系統(tǒng)集成的范疇里,stream(或 log)是一種有序的粘驰、只能追加的持久化的數(shù)據(jù)結(jié)構(gòu)屡谐。Stream 概念里的 message 更應(yīng)該叫做 record,總是被添加到 stream 末尾蝌数,且不會(huì)在被消費(fèi)之后自動(dòng)刪除(不同于 queue)愕掏。這種特性令 stream 更像是一種數(shù)據(jù)倉(cāng)庫(kù)而不是消息中間件。
Stream 的另一個(gè)重要特性在于顶伞,record 是被消費(fèi)者從 stream 中“拉取”的饵撑,因而消費(fèi)者可以按照自己的節(jié)奏處理 record。
Stream 可以用來(lái)實(shí)現(xiàn)可靠的消息投遞唆貌,一旦消費(fèi)者崩潰滑潘,它可以在恢復(fù)后從中斷的地方繼續(xù)拉取消息。
Streams vs 消息隊(duì)列
Stream 明顯的應(yīng)用場(chǎng)景在于處理順序的流數(shù)據(jù)锨咙,也支持批量處理或者根據(jù)之前的消息確定相關(guān)性语卤,并可以跨多個(gè)節(jié)點(diǎn)分發(fā)數(shù)據(jù)。
Stream 和消息隊(duì)列都可以實(shí)現(xiàn) Pub/Sub 模式,但消息隊(duì)列更適合復(fù)雜的系統(tǒng)集成任務(wù)粹舵,它可以提供更復(fù)雜的路由機(jī)制钮孵,允許我們?yōu)椴煌南⑻峁┎煌膬?yōu)先級(jí),而 Stream 中 record 的順序是一定的眼滤。
通過(guò) Redis Streams 實(shí)現(xiàn) chat 應(yīng)用
index-stream.js:
import { createServer } from 'http'
import staticHandler from 'serve-handler'
import ws, { WebSocketServer } from 'ws'
import Redis from 'ioredis'
const redisClient = new Redis()
const redisClientXRead = new Redis()
const server = createServer((req, res) => {
return staticHandler(req, res, { public: 'www' })
})
const wss = new WebSocketServer({ server })
wss.on('connection', async client => {
console.log('Client connected')
client.on('message', msg => {
console.log(`Message: ${msg}`)
redisClient.xadd('chat_stream', '*', 'message', msg)
})
const logs = await redisClient.xrange(
'chat_stream', '-', '+')
for (const [, [, message]] of logs) {
client.send(message)
}
})
function broadcast(msg) {
for (const client of wss.clients) {
if (client.readyState === ws.OPEN) {
client.send(msg)
}
}
}
let lastRecordId = '$'
async function processStreamMessages() {
while (true) {
const [[, records]] = await redisClientXRead.xread(
'BLOCK', '0', 'STREAMS', 'chat_stream', lastRecordId)
for (const [recordId, [, message]] of records) {
console.log(`Message from stream: ${message}`)
broadcast(message)
lastRecordId = recordId
}
}
}
processStreamMessages().catch(err => console.error(err))
server.listen(process.argv[2] || 8080)
-
xadd
負(fù)責(zé)在收到來(lái)自客戶端的消息時(shí)巴席,向 stream 添加一條新的 record。它接收 3 個(gè)參數(shù):- Stream 的名字诅需,這里是
chat_stream
- record 的 ID漾唉。這里傳入的是星號(hào)(
*
),令 Redis 為我們生成一個(gè) ID诱担。ID 必須是單調(diào)遞增的毡证,以保持 record 的順序,而 Redis 可以替我們處理這些 - key-value 的列表蔫仙。這里只提供 value
msg
(從客戶端收到的消息)的 'message' key
- Stream 的名字诅需,這里是
- 使用
xrange
檢索 stream 的過(guò)往記錄料睛,以獲取聊天歷史。我們?cè)诿看斡锌蛻舳诉B接時(shí)就進(jìn)行一次檢索摇邦。其中-
表示最小的 ID 值恤煞,+
表示最大的 ID 值,因而整個(gè)xrange
會(huì)獲取當(dāng)前 stream 中所有的消息 - 最后一部分的邏輯是等待新的記錄被添加到 stream 中施籍,從而每個(gè)應(yīng)用實(shí)例都能讀取到更新的消息居扒。這里使用一個(gè)無(wú)線循環(huán)和
xread
命令:- 其中
BLOCK
表示在新消息到達(dá)前阻塞 -
0
用來(lái)指定超時(shí)時(shí)間丑慎,超過(guò)這個(gè)時(shí)間則直接返回null
喜喂。0
代表不超時(shí) -
STREAMS
是一個(gè)關(guān)鍵字,告訴 Redis 我們接下來(lái)會(huì)指定想要讀取的 stream 的細(xì)節(jié) -
chat_stream
是 stream 的名字 - 最后我們提供 record ID(
lastRecordId
)作為讀取新消息的節(jié)點(diǎn)竿裂。初始情況下是$
玉吁,表示當(dāng)前 stream 中最大的 ID。當(dāng)我們讀取第一條消息后腻异,更新lastRecordId
為最近讀取到的消息的 ID
- 其中
此外进副,解包消息的代碼 for (const [, [, message]] of logs) {...}
實(shí)際上等同于 for (const [recordId, [propertyId, message]] of logs) {...}
,由 xrange
命令查詢到的消息的格式如下:
[
["1588590110918-0", ["message", "This is a message"]],
["1588590130852-0", ["message", "This is another message"]]
]