Node.js 設(shè)計(jì)模式筆記 —— 消息中間件及其應(yīng)用模式(發(fā)布訂閱)

主要有兩類技術(shù)可以用來(lái)整合分布式應(yīng)用:一類是通過(guò)共享存儲(chǔ)作為一個(gè)中心化的協(xié)調(diào)者鞠鲜,跟蹤和保存所有需要共享的信息;另一類則是通過(guò)消息中間件断国,向系統(tǒng)中的所有節(jié)點(diǎn)散布數(shù)據(jù)贤姆、事件和命令等。
消息存在于軟件系統(tǒng)的各個(gè)層級(jí)稳衬。我們通過(guò)互聯(lián)網(wǎng)交換消息完成通信霞捡;通過(guò)管道發(fā)送消息給其他進(jìn)程;設(shè)備驅(qū)動(dòng)通過(guò)消息與硬件進(jìn)行交互等等薄疚。任何用于在組件和系統(tǒng)之間交換信息的離散或結(jié)構(gòu)化數(shù)據(jù)都可以視為消息碧信。

消息系統(tǒng)基礎(chǔ)

對(duì)于消息系統(tǒng),有以下四個(gè)基本要素需要考慮:

  • 通訊的方向街夭∨椴辏可以是單向的,也可以是“請(qǐng)求 - 響應(yīng)”模式
  • 通訊的目的板丽。同時(shí)決定了消息本身的內(nèi)容
  • 消息的時(shí)效性呈枉。可以同步或者異步地發(fā)送與接收
  • 消息的投遞方式埃碱〔瑁可以直接投遞也可以通過(guò)某個(gè)中間件

單向 vs “請(qǐng)求 - 應(yīng)答”模式

單向模式:消息從源頭推送到目的地。常見(jiàn)的應(yīng)用比如郵件系統(tǒng)乃正、將工作任務(wù)分派給一系列工作節(jié)點(diǎn)的系統(tǒng)住册。


單向通信

“請(qǐng)求 - 響應(yīng)”模式:一方發(fā)出的消息總能夠與對(duì)方發(fā)出的消息匹配。比如 web 服務(wù)的調(diào)用瓮具、向數(shù)據(jù)庫(kù)請(qǐng)求數(shù)據(jù)等荧飞。

Request/Reply

包含多個(gè)響應(yīng)節(jié)點(diǎn)的“請(qǐng)求 - 響應(yīng)”模式:

Multi-node request/reply

消息類型

消息內(nèi)容主要取決于通信的目的凡人。通常有以下三種:

  • 命令消息
  • 事件消息
  • 文檔消息

命令消息用來(lái)令接收者觸發(fā)某個(gè)動(dòng)作或者任務(wù)。借助它可以實(shí)現(xiàn)遠(yuǎn)程過(guò)程調(diào)用(RPC)系統(tǒng)叹阔,分布式計(jì)算等挠轴。RESTful HTTP 請(qǐng)求就是簡(jiǎn)單的命令消息的例子。
事件消息用來(lái)通知另一個(gè)組件發(fā)生了某些情況耳幢。事件在分布式系統(tǒng)中是一種很重要的整合機(jī)制岸晦,用來(lái)確保系統(tǒng)的各個(gè)組件保持同樣的步調(diào)。
文檔消息基本上就是在組件之間傳輸數(shù)據(jù)睛藻。比如數(shù)據(jù)庫(kù)請(qǐng)求的結(jié)果启上。

異步隊(duì)列和流

同步通信類似于打電話。電話的雙方必須同時(shí)在線店印,連接到同一個(gè)通道冈在,實(shí)時(shí)地交流信息。當(dāng)我們需要打給另一個(gè)人時(shí)按摘,通常就得搞一部新的手機(jī)或者掛掉當(dāng)前正在進(jìn)行的通話包券,撥打新的號(hào)碼。
異步通信類似于發(fā)短信炫贤。我們發(fā)送短信的時(shí)刻溅固,并不需要接收方已經(jīng)接入了網(wǎng)絡(luò)。我們可以一條接一條地發(fā)送多條短信給不同的人兰珍,以任意順序接收對(duì)方的回復(fù)(如果有的話)侍郭。

另一個(gè)異步通信的重要特性就是,消息可以被臨時(shí)存儲(chǔ)在某個(gè)地方俩垃,再在之后的某個(gè)時(shí)間送達(dá)励幼。當(dāng)接收方非常忙碌無(wú)法處理新的消息,或者我們需要確保投遞的成功率時(shí)口柳,這個(gè)特性就非常有用了苹粟。
消息隊(duì)列就是這樣一種在生產(chǎn)者和消費(fèi)者之間存儲(chǔ)消息的中間組件。若消費(fèi)者因?yàn)槟撤N原因崩潰跃闹、斷開(kāi)連接等嵌削,消息會(huì)在隊(duì)列中累積,待消費(fèi)者重新上線時(shí)立即進(jìn)行分發(fā)望艺。

另外一種類似的數(shù)據(jù)結(jié)構(gòu)是 log苛秕。log 是一種只能追加的結(jié)構(gòu),它是持久的找默,其消息可以在到達(dá)時(shí)被讀取艇劫,也可以通過(guò)訪問(wèn)其歷史記錄來(lái)獲取。在消息系統(tǒng)中惩激,也常被叫做 stream店煞。
不同于隊(duì)列蟹演,在 stream 中,消息被消費(fèi)后不會(huì)被移除顷蟀,意味著 stream 在消息的獲取方面有著更高的自由度酒请。隊(duì)列通常一次只暴露一條消息給消費(fèi)者,而一個(gè) stream 能夠被多個(gè)消費(fèi)者共享(甚至是同一份消息)鸣个。

消息隊(duì)列:


message queue

流:


stream

點(diǎn)對(duì)點(diǎn) vs 消息中間件

peer-to-peer vs broker

“發(fā)布 - 訂閱” 模式

就是一種分布式的觀察者模式羞反。

Pub/Sub

一個(gè)最小化的實(shí)時(shí)聊天應(yīng)用

package.json:

{
    "type": "module",
    "dependencies": {
        "amqplib": "^0.10.3",
        "ioredis": "^5.2.4",
        "JSONStream": "^1.3.5",
        "level": "^8.0.0",
        "leveldown": "^6.1.1",
        "levelup": "^5.1.1",
        "monotonic-timestamp": "^0.0.9",
        "serve-handler": "^6.1.5",
        "superagent": "^8.0.6",
        "ws": "^8.11.0",
        "yargs": "^17.6.2",
        "zeromq": "^6.0.0-beta.16"
    }
}

index.js:

import ws, { WebSocketServer } from 'ws'
import { createServer } from 'http'
import staticHandler from 'serve-handler'

const server = createServer((req, res) => {
    return staticHandler(req, res, { public: 'www' })
})

const wss = new WebSocketServer({ server })
wss.on('connection', client => {
    console.log('Client connected')
    client.on('message', msg => {
        console.log(`Message: ${msg}`)
        broadcast(`${msg}`)
    })
})

function broadcast(msg) {
    for (const client of wss.clients) {
        if (client.readyState == ws.OPEN) {
            client.send(msg)
        }
    }
}

server.listen(process.argv[2] || 8000)
  • 首先創(chuàng)建一個(gè) HTTP 服務(wù),將所有請(qǐng)求轉(zhuǎn)發(fā)給一個(gè)特別的 handler(staticHandler)囤萤,該 handler 負(fù)責(zé) serve 所有的靜態(tài)文件
  • 創(chuàng)建一個(gè) WebSocket 服務(wù)實(shí)例昼窗,綁定到 HTTP 服務(wù)拖陆。同時(shí)監(jiān)聽(tīng)來(lái)自 WebSocket 客戶端的連接請(qǐng)求,以及客戶端發(fā)送的消息
  • 當(dāng)某個(gè)客戶端發(fā)送的新消息到達(dá)時(shí)匣沼,通過(guò) broadcast() 函數(shù)將消息廣播給所有的客戶端

www/index.html:

<!DOCTYPE html>
<html>
  <body>
    Messages:
    <div id="messages"></div>
    <form id="msgForm">
      <input type="text" placeholder="Send a message" id="msgBox"/>
      <input type="submit" value="Send"/>

    </form>
    <script>
      const ws = new WebSocket(
          `ws://${window.document.location.host}`
      )
      ws.onmessage = function (message) {
          const msgDiv = document.createElement('div')
          msgDiv.innerHTML = message.data
          document.getElementById('messages').appendChild(msgDiv)
      }
      const form = document.getElementById('msgForm')
      form.addEventListener('submit', (event) => {
          event.preventDefault()
          const message = document.getElementById('msgBox').value
          ws.send(message)
          document.getElementById('msgBox').value = ''
      })
    </script>
  </body>
</html>

通過(guò) node index.js 8002 命令運(yùn)行應(yīng)用壮虫,打開(kāi)兩個(gè)瀏覽器頁(yè)面訪問(wèn) Web 服務(wù),測(cè)試聊天效果:

simple chat

但我們的應(yīng)用是無(wú)法進(jìn)行橫向擴(kuò)展的荤崇。比如再啟動(dòng)一個(gè)新的服務(wù)實(shí)例 node index.js 8003,此時(shí)連接到 8002 的客戶端無(wú)法與連接到 8003 的客戶端通信〈盗瘢可以自行測(cè)試。

使用 Redis 作為消息中間件

架構(gòu)圖如下所示滚婉。每個(gè)服務(wù)實(shí)例都會(huì)把從客戶端收到的消息發(fā)布到消息中間件图筹,同時(shí)也會(huì)通過(guò)中間件訂閱從其他服務(wù)實(shí)例發(fā)布的消息。

message broker
  • 通過(guò)客戶端網(wǎng)頁(yè)發(fā)送的消息傳遞給對(duì)應(yīng)的 chat server
  • chat server 把收到的消息發(fā)布到 Redis
  • Redis 將收到的消息分發(fā)給所有的訂閱方(chat server)
  • chat server 將收到的消息再分發(fā)給所有連接的客戶端

index-redis.js:

import ws, { WebSocketServer } from 'ws'
import { createServer } from 'http'
import staticHandler from 'serve-handler'
import Redis from 'ioredis'

const redisSub = new Redis()
const redisPub = new Redis()

const server = createServer((req, res) => {
    return staticHandler(req, res, { public: 'www' })
})

const wss = new WebSocketServer({ server })
wss.on('connection', client => {
    console.log('Client connected')
    client.on('message', msg => {
        console.log(`Message: ${msg}`)
        redisPub.publish('chat_message', `${msg}`)
    })
})

redisSub.subscribe('chat_message')

redisSub.on('message', (channel, msg) => {
    for (const client of wss.clients) {
        if (client.readyState === ws.OPEN) {
            client.send(msg)
        }
    }
})

server.listen(process.argv[2] || 8000)

運(yùn)行 node index-redis.js 8002让腹、node index-redis.js 8003 兩條命令啟動(dòng)兩個(gè)服務(wù)實(shí)例远剩,此時(shí)連接到不同服務(wù)器的客戶端相互之間也能夠進(jìn)行通信。

message broker

點(diǎn)對(duì)點(diǎn) Pub/Sub 模式

通過(guò) ZeroMQ 創(chuàng)建兩種類型的 socket:PUBSUB骇窍。PUB socket 綁定到本地機(jī)器的某個(gè)端口瓜晤,負(fù)責(zé)監(jiān)聽(tīng)來(lái)自其他機(jī)器上 SUB socket 的訂閱請(qǐng)求。當(dāng)一條消息通過(guò) PUB socket 發(fā)送時(shí)腹纳,該消息會(huì)被廣播到所有連接的 SUB socket痢掠。

peer to peer

index-zeromq.js:

import { createServer } from 'http'
import staticHandler from 'serve-handler'
import ws, { WebSocketServer } from 'ws'
import yargs from 'yargs'
import zmq from 'zeromq'

const server = createServer((req, res) => {
    return staticHandler(req, res, { public: 'www' })
})

let pubSocket
async function initializeSockets() {
    pubSocket = new zmq.Publisher()
    await pubSocket.bind(`tcp://127.0.0.1:${yargs(process.argv).argv.pub}`)

    const subSocket = new zmq.Subscriber()
    const subPorts = [].concat(yargs(process.argv).argv.sub)
    for (const port of subPorts) {
        console.log(`Subscribing to ${port}`)
        subSocket.connect(`tcp://127.0.0.1:${port}`)
    }

    subSocket.subscribe('chat')

    for await (const [msg] of subSocket) {
        console.log(`Message from another server: ${msg}`)
        broadcast(msg.toString().split(' ')[1])
    }
}

initializeSockets()

const wss = new WebSocketServer({ server })
wss.on('connection', client => {
    console.log('Client connected')
    client.on('message', msg => {
        console.log(`Message: ${msg}`)
        broadcast(`${msg}`)
        pubSocket.send(`chat ${msg}`)
    })
})

function broadcast(msg) {
    for (const client of wss.clients) {
        if (client.readyState === ws.OPEN) {
            client.send(msg)
        }
    }
}

server.listen(yargs(process.argv).argv.http || 8000)
  • 通過(guò) yargs 模塊解析命令行參數(shù)
  • 通過(guò) initializeSocket() 函數(shù)創(chuàng)建 Publisher,并綁定到由 --pub 命令行參數(shù)提供的端口上
  • 創(chuàng)建 Subscriber socket 并將其連接到其他應(yīng)用實(shí)例的 Publisher socket嘲恍。被連接的 Publisher 端口由 --sub 命令行參數(shù)提供足画。之后創(chuàng)建以 chat 為過(guò)濾器的訂閱,即只接收以 chat 開(kāi)頭的消息
  • 通過(guò) for 循環(huán)監(jiān)聽(tīng)到達(dá) Subscriber 的消息佃牛,去除消息中的 chat 前綴淹辞,通過(guò) broadcast() 函數(shù)將處理后的消息廣播給所有連接的客戶端
  • 當(dāng)有消息到達(dá)當(dāng)前實(shí)例的 WebSocket 服務(wù)時(shí),廣播此消息到所有客戶端俘侠,同時(shí)通過(guò) Publisher 發(fā)布該消息

運(yùn)行服務(wù)測(cè)試效果:

node index-zeromq.js --http 8002 --pub 5000 --sub 5001 --sub 5002
node index-zeromq.js --http 8003 --pub 5001 --sub 5000 --sub 5002
node index-zeromq.js --http 8004 --pub 5002 --sub 5000 --sub 5001
peer to peer

通過(guò)隊(duì)列實(shí)現(xiàn)可靠的消息投遞

消息隊(duì)列是消息系統(tǒng)中的一種重要抽象象缀。借助消息隊(duì)列蔬将,通信中的發(fā)送方和接收方不必同時(shí)處于活躍的連接狀態(tài)。隊(duì)列系統(tǒng)會(huì)負(fù)責(zé)存儲(chǔ)未投遞的消息攻冷,直到目標(biāo)處于能夠接收的狀態(tài)娃胆。

消息系統(tǒng)的投遞機(jī)制可以簡(jiǎn)單概況為以下 3 類:

  • 最多一次:fire-and-forget。消息不會(huì)被持久化等曼,投遞狀態(tài)也不會(huì)被確認(rèn)里烦。意味著在接收者崩潰或者斷開(kāi)連接時(shí),消息有可能丟失
  • 最少一次:消息會(huì)確保至少被收到一次禁谦。但是重復(fù)收取同一條消息的情況有可能出現(xiàn)胁黑,比如接收者在收到消息后突然崩潰,沒(méi)有來(lái)得及告知發(fā)送者消息已經(jīng)收到州泊。
  • 只有一次:這是最可靠的投遞機(jī)制丧蘸,保證消息只會(huì)被接收一次。但由于需要更復(fù)雜的確認(rèn)機(jī)制遥皂,會(huì)犧牲一部分消息投遞的效率力喷。

當(dāng)消息投遞機(jī)制可以實(shí)現(xiàn)“最少一次”或者“只有一次”時(shí),我們就有了 durable subscriber演训。

durable subscriber
AMQP

AMQP 是一個(gè)被很多消息系統(tǒng)支持的開(kāi)放標(biāo)準(zhǔn)協(xié)議弟孟。除了定義一個(gè)通用的傳輸協(xié)議以外,他還提供了用于描述 routing样悟、filtering拂募、queuing、reliability 和 security 的模型窟她。

AMQP
  • Queue:用于存儲(chǔ)消息的數(shù)據(jù)結(jié)構(gòu)陈症。假如多個(gè)消費(fèi)者綁定了同一個(gè)隊(duì)列,消息在它們之間是負(fù)載均衡的震糖。隊(duì)列可以是以下任意一種類型:
    • Durable:當(dāng)中間件重啟時(shí)隊(duì)列會(huì)自動(dòng)重建录肯。但這并不意味著其內(nèi)容也會(huì)被保留。實(shí)際上只有標(biāo)記為持久化消息的內(nèi)容才會(huì)被保存到磁盤试伙,并在重啟時(shí)恢復(fù)
    • Exclusive:隊(duì)列只綁定給唯一一個(gè)特定的訂閱者嘁信,當(dāng)連接關(guān)閉時(shí),隊(duì)列即被銷毀
    • Auto-delete:當(dāng)最后一個(gè)訂閱者斷開(kāi)連接時(shí)疏叨,隊(duì)列被刪除
  • Exchange:消息發(fā)布的地方潘靖。Exchange 會(huì)將消息路由至一個(gè)或者多個(gè) queue。路由規(guī)則取決于具體的實(shí)現(xiàn):
    • Direct exchange:通過(guò)完整匹配一個(gè) routing key 來(lái)對(duì)消息進(jìn)行路由(如 chat.msg
    • Topic exchange:對(duì) routing key 進(jìn)行模糊匹配(如 chat.# 匹配所有以 chat 開(kāi)頭的 key)
    • Fanout exchange:將消息廣播至所有連接的 queue蚤蔓,忽略提供的任何 routing key
  • Binding:Exchange 和 queue 之間的鏈接卦溢,定義了用于過(guò)濾消息的 routing key 或模式

上述所有組件由中間件進(jìn)行維護(hù),同時(shí)對(duì)外暴露用于創(chuàng)建和維護(hù)的 API。當(dāng)連接到某個(gè)中間件時(shí)单寂,客戶端會(huì)創(chuàng)建一個(gè) channel 對(duì)象負(fù)責(zé)維護(hù)通信的狀態(tài)贬芥。

AMQP 和 RabbitMQ 實(shí)現(xiàn) durable subscriber

chat 應(yīng)用和消息歷史記錄服務(wù)的架構(gòu)圖:

AMQP and history service
AMQP 和數(shù)據(jù)庫(kù)實(shí)現(xiàn) history service

此模塊由兩部分組成:一個(gè) HTTP 服務(wù)負(fù)責(zé)將聊天歷史記錄暴露給客戶端;一個(gè) AMQP 消費(fèi)者負(fù)責(zé)獲取聊天消息并將它們保存在本地?cái)?shù)據(jù)庫(kù)中宣决。

historySvc.js:

import { createServer } from 'http'
import levelup from 'levelup'
import leveldown from 'leveldown'
import timestamp from 'monotonic-timestamp'
import JSONStream from 'JSONStream'
import amqp from 'amqplib'


async function main() {
    const db = levelup(leveldown('./msgHistory'))

    const connection = await amqp.connect('amqp://localhost')
    const channel = await connection.createChannel()
    await channel.assertExchange('chat', 'fanout')
    const { queue } = channel.assertQueue('chat_history')
    await channel.bindQueue(queue, 'chat')
    channel.consume(queue, async msg => {
        const content = msg.content.toString()
        console.log(`Saving message: ${content}`)
        await db.put(timestamp(), content)
        channel.ack(msg)
    })

    createServer((req, res) => {
        res.writeHead(200)
        db.createValueStream()
            .pipe(JSONStream.stringify())
            .pipe(res)
    }).listen(8090)
}

main().catch(err => console.error(err))
  • 創(chuàng)建一個(gè)到 AMQP 中間件的連接
  • 設(shè)置一個(gè)名為 chat 的 fanout 模式的 exchange蘸劈。assertExchange() 函數(shù)會(huì)確保相應(yīng)的 exchange 存在,否則就創(chuàng)建
  • 創(chuàng)建一個(gè)名為 chat_history 的 queue尊沸,綁定給上一步中創(chuàng)建的 exchange
  • 開(kāi)始監(jiān)聽(tīng)來(lái)自 queue 的消息威沫,將收到的每一條消息保存至 LevelDB 數(shù)據(jù)庫(kù),以時(shí)間戳作為鍵洼专。消息保存成功后由 channel.ack(msg) 進(jìn)行確認(rèn)棒掠。若確認(rèn)動(dòng)作未被中間件收到,則該條消息會(huì)保留在隊(duì)列中再次被處理

index-amqp.js

import { createServer } from 'http'
import staticHandler from 'serve-handler'
import ws, { WebSocketServer } from 'ws'
import amqp from 'amqplib'
import JSONStream from 'JSONStream'
import superagent from 'superagent'

const httpPort = process.argv[2] || 8000

async function main() {
    const connection = await amqp.connect('amqp://localhost')
    const channel = await connection.createChannel()
    await channel.assertExchange('chat', 'fanout')
    const { queue } = await channel.assertQueue(
        `chat_srv_${httpPort}`,
        { exclusive: true })
    await channel.bindQueue(queue, 'chat')
    channel.consume(queue, msg => {
        msg = msg.content.toString()
        console.log(`From queue: ${msg}`)
        broadcast(msg)
    }, { noAck: true })

    const server = createServer((req, res) => {
        return staticHandler(req, res, { public: 'www' })
    })

    const wss = new WebSocketServer({ server })
    wss.on('connection', client => {
        console.log('Client connected')

        client.on('message', msg => {
            console.log(`Message: ${msg}`)
            channel.publish('chat', '', Buffer.from(msg))
        })

        superagent
            .get('http://localhost:8090')
            .on('error', err => console.log(err))
            .pipe(JSONStream.parse('*'))
            .on('data', msg => {
                client.send(Buffer(msg).toString())
            })
    })

    function broadcast(msg) {
        for (const client of wss.clients) {
            if (client.readyState === ws.OPEN) {
                client.send(msg)
            }
        }
    }
    server.listen(httpPort)
}

main().catch(err => console.log(err))
  • 我們的聊天服務(wù)沒(méi)必要是 durable subscriber屁商,fire-and-forget 機(jī)制就足夠了烟很,因而有 { exclusive: true } 選項(xiàng)
  • 確認(rèn)機(jī)制也是不需要的。{ noAck: true }
  • 發(fā)布消息也很簡(jiǎn)單蜡镶,只需要指定目標(biāo) exchange(chat)和一個(gè) routing key 即可雾袱,這里我們使用的是 fanout exchange,不需要路由官还,routing key 為空
  • 發(fā)布到 exchange 的消息被轉(zhuǎn)發(fā)到所有綁定的 queue谜酒,再到達(dá)所有訂閱了 queue 的服務(wù)實(shí)例,每個(gè)實(shí)例再將消息發(fā)送到所有連接的客戶端
  • 通過(guò) superagent 請(qǐng)求 history 微服務(wù)妻枕,將獲取到的所有歷史消息發(fā)送給剛連接的客戶端

運(yùn)行服務(wù)測(cè)試效果:

node index-amqp.js 8002
node index-amqp.js 8003
node historySvc.js

通過(guò) streams 實(shí)現(xiàn)可靠的消息投遞

在系統(tǒng)集成的范疇里,stream(或 log)是一種有序的粘驰、只能追加的持久化的數(shù)據(jù)結(jié)構(gòu)屡谐。Stream 概念里的 message 更應(yīng)該叫做 record,總是被添加到 stream 末尾蝌数,且不會(huì)在被消費(fèi)之后自動(dòng)刪除(不同于 queue)愕掏。這種特性令 stream 更像是一種數(shù)據(jù)倉(cāng)庫(kù)而不是消息中間件。
Stream 的另一個(gè)重要特性在于顶伞,record 是被消費(fèi)者從 stream 中“拉取”的饵撑,因而消費(fèi)者可以按照自己的節(jié)奏處理 record。
Stream 可以用來(lái)實(shí)現(xiàn)可靠的消息投遞唆貌,一旦消費(fèi)者崩潰滑潘,它可以在恢復(fù)后從中斷的地方繼續(xù)拉取消息。

Reliable message delivery with streams
Streams vs 消息隊(duì)列

Stream 明顯的應(yīng)用場(chǎng)景在于處理順序的流數(shù)據(jù)锨咙,也支持批量處理或者根據(jù)之前的消息確定相關(guān)性语卤,并可以跨多個(gè)節(jié)點(diǎn)分發(fā)數(shù)據(jù)。
Stream 和消息隊(duì)列都可以實(shí)現(xiàn) Pub/Sub 模式,但消息隊(duì)列更適合復(fù)雜的系統(tǒng)集成任務(wù)粹舵,它可以提供更復(fù)雜的路由機(jī)制钮孵,允許我們?yōu)椴煌南⑻峁┎煌膬?yōu)先級(jí),而 Stream 中 record 的順序是一定的眼滤。

通過(guò) Redis Streams 實(shí)現(xiàn) chat 應(yīng)用

index-stream.js:

import { createServer } from 'http'
import staticHandler from 'serve-handler'
import ws, { WebSocketServer } from 'ws'
import Redis from 'ioredis'


const redisClient = new Redis()
const redisClientXRead = new Redis()

const server = createServer((req, res) => {
    return staticHandler(req, res, { public: 'www' })
})

const wss = new WebSocketServer({ server })
wss.on('connection', async client => {
    console.log('Client connected')
    client.on('message', msg => {
        console.log(`Message: ${msg}`)
        redisClient.xadd('chat_stream', '*', 'message', msg)
    })

    const logs = await redisClient.xrange(
        'chat_stream', '-', '+')
    for (const [, [, message]] of logs) {
        client.send(message)
    }
})

function broadcast(msg) {
    for (const client of wss.clients) {
        if (client.readyState === ws.OPEN) {
            client.send(msg)
        }
    }
}


let lastRecordId = '$'

async function processStreamMessages() {
    while (true) {
        const [[, records]] = await redisClientXRead.xread(
            'BLOCK', '0', 'STREAMS', 'chat_stream', lastRecordId)
        for (const [recordId, [, message]] of records) {
            console.log(`Message from stream: ${message}`)
            broadcast(message)
            lastRecordId = recordId
        }
    }
}

processStreamMessages().catch(err => console.error(err))

server.listen(process.argv[2] || 8080)
  • xadd 負(fù)責(zé)在收到來(lái)自客戶端的消息時(shí)巴席,向 stream 添加一條新的 record。它接收 3 個(gè)參數(shù):
    • Stream 的名字诅需,這里是 chat_stream
    • record 的 ID漾唉。這里傳入的是星號(hào)(*),令 Redis 為我們生成一個(gè) ID诱担。ID 必須是單調(diào)遞增的毡证,以保持 record 的順序,而 Redis 可以替我們處理這些
    • key-value 的列表蔫仙。這里只提供 value msg(從客戶端收到的消息)的 'message' key
  • 使用 xrange 檢索 stream 的過(guò)往記錄料睛,以獲取聊天歷史。我們?cè)诿看斡锌蛻舳诉B接時(shí)就進(jìn)行一次檢索摇邦。其中 - 表示最小的 ID 值恤煞,+ 表示最大的 ID 值,因而整個(gè) xrange 會(huì)獲取當(dāng)前 stream 中所有的消息
  • 最后一部分的邏輯是等待新的記錄被添加到 stream 中施籍,從而每個(gè)應(yīng)用實(shí)例都能讀取到更新的消息居扒。這里使用一個(gè)無(wú)線循環(huán)和 xread 命令:
    • 其中 BLOCK 表示在新消息到達(dá)前阻塞
    • 0 用來(lái)指定超時(shí)時(shí)間丑慎,超過(guò)這個(gè)時(shí)間則直接返回 null喜喂。0 代表不超時(shí)
    • STREAMS 是一個(gè)關(guān)鍵字,告訴 Redis 我們接下來(lái)會(huì)指定想要讀取的 stream 的細(xì)節(jié)
    • chat_stream 是 stream 的名字
    • 最后我們提供 record ID(lastRecordId)作為讀取新消息的節(jié)點(diǎn)竿裂。初始情況下是 $玉吁,表示當(dāng)前 stream 中最大的 ID。當(dāng)我們讀取第一條消息后腻异,更新 lastRecordId 為最近讀取到的消息的 ID

此外进副,解包消息的代碼 for (const [, [, message]] of logs) {...} 實(shí)際上等同于 for (const [recordId, [propertyId, message]] of logs) {...},由 xrange 命令查詢到的消息的格式如下:

[
  ["1588590110918-0", ["message", "This is a message"]],
  ["1588590130852-0", ["message", "This is another message"]]
]

參考資料

Node.js Design Patterns: Design and implement production-grade Node.js applications using proven patterns and techniques, 3rd Edition

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末悔常,一起剝皮案震驚了整個(gè)濱河市影斑,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌机打,老刑警劉巖矫户,帶你破解...
    沈念sama閱讀 217,277評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異姐帚,居然都是意外死亡吏垮,警方通過(guò)查閱死者的電腦和手機(jī)障涯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)膳汪,“玉大人唯蝶,你說(shuō)我怎么就攤上這事∫潘裕” “怎么了粘我?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,624評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)痹换。 經(jīng)常有香客問(wèn)我征字,道長(zhǎng),這世上最難降的妖魔是什么娇豫? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,356評(píng)論 1 293
  • 正文 為了忘掉前任匙姜,我火速辦了婚禮,結(jié)果婚禮上冯痢,老公的妹妹穿的比我還像新娘氮昧。我一直安慰自己,他們只是感情好浦楣,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,402評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布袖肥。 她就那樣靜靜地躺著,像睡著了一般椎组。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上哮洽,一...
    開(kāi)封第一講書(shū)人閱讀 51,292評(píng)論 1 301
  • 那天匪凉,我揣著相機(jī)與錄音,去河邊找鬼。 笑死碗旅,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的叮喳。 我是一名探鬼主播锣咒,決...
    沈念sama閱讀 40,135評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼艇潭,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了戏蔑?” 一聲冷哼從身側(cè)響起蹋凝,我...
    開(kāi)封第一講書(shū)人閱讀 38,992評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎总棵,沒(méi)想到半個(gè)月后鳍寂,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,429評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡情龄,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,636評(píng)論 3 334
  • 正文 我和宋清朗相戀三年迄汛,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了捍壤。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,785評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡鞍爱,死狀恐怖鹃觉,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情硬霍,我是刑警寧澤帜慢,帶...
    沈念sama閱讀 35,492評(píng)論 5 345
  • 正文 年R本政府宣布,位于F島的核電站唯卖,受9級(jí)特大地震影響粱玲,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜拜轨,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,092評(píng)論 3 328
  • 文/蒙蒙 一抽减、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧橄碾,春花似錦卵沉、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,723評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至拒垃,卻和暖如春停撞,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背悼瓮。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,858評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工戈毒, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人横堡。 一個(gè)月前我還...
    沈念sama閱讀 47,891評(píng)論 2 370
  • 正文 我出身青樓埋市,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親命贴。 傳聞我的和親對(duì)象是個(gè)殘疾皇子道宅,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,713評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容