分布式websocket解決方案

前景

我們?cè)谧鼍W(wǎng)頁聊天的話击狮,基本上都是用到websocket去做。如果單臺(tái)服務(wù)器難以支撐的話益老,我們就會(huì)考慮加機(jī)器彪蓬,做成分布式,做成集群模式捺萌。這個(gè)時(shí)候就會(huì)出現(xiàn)關(guān)于分布式 session 的問題档冬,也就是說多臺(tái)fd 連接問題。

單機(jī)架構(gòu)

圖片.png

從上面的箭頭我們就可以看出桃纯,雙向是可以通信的酷誓。可以來回發(fā)送信息的态坦。

分布式架構(gòu)

圖片.png
  • 假如 client1 想要單獨(dú)發(fā)送私聊的信息給 client4呛牲,這個(gè)時(shí)候怎么辦?

  • 假如 client1 想要發(fā)送群聊給所有的客戶端驮配,這個(gè)時(shí)候怎么辦?

  • 假如 后臺(tái)管理人家着茸,先要發(fā)送系統(tǒng)消息廣播給所有人壮锻。這個(gè)時(shí)候怎么辦?

模擬

nginx.conf

map $http_upgrade $connection_upgrade {
    default upgrade;
    '' close;
}

upstream websocket {
    server 127.0.0.1:9511;
    server 127.0.0.1:9522;
}

server {
    listen 80;
    server_name www.abc.com;
    location / {
        proxy_pass http://websocket;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection $connection_upgrade;
    }
}

ws1.php

<?php
/***
 * User: gan
 * Date: 2019/11/2
 * Time: 9:21 上午
 */

$server = new Swoole\WebSocket\Server("0.0.0.0", 9511);

$server->on('open', function (Swoole\WebSocket\Server $server, $request) {
    echo "server: handshake success with fd{$request->fd}\n";
    $server->push($request->fd, "hello, welcome\n");
});

$server->on('message', function (Swoole\WebSocket\Server $server, $frame) {
    echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
    foreach ($server->connections as $fd) {
        // 需要先判斷是否是正確的websocket連接涮阔,否則有可能會(huì)push失敗
        if ($server->isEstablished($fd)) {
            $server->push($fd, $frame->data);
        }
    }
});

$server->on('close', function ($ser, $fd) {
    echo "client {$fd} closed\n";
});

$server->start();

ws2.php

<?php
/***
 * User: gan
 * Date: 2019/11/2
 * Time: 9:21 上午
 */

$server = new Swoole\WebSocket\Server("0.0.0.0", 9522);

$server->on('open', function (Swoole\WebSocket\Server $server, $request) {
    echo "server: handshake success with fd{$request->fd}\n";
    $server->push($request->fd, "hello, welcome\n");
});

$server->on('message', function (Swoole\WebSocket\Server $server, $frame) {
    echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
    foreach ($server->connections as $fd) {
        // 需要先判斷是否是正確的websocket連接猜绣,否則有可能會(huì)push失敗
        if ($server->isEstablished($fd)) {
            $server->push($fd, $frame->data);
        }
    }
});

$server->on('close', function ($ser, $fd) {
    echo "client {$fd} closed\n";
});

$server->start();

websocket測(cè)試工具

圖片.png
圖片.png

二個(gè)客戶端連接,都發(fā)送了消息敬特,但是都收不到彼此的消息掰邢,我們上面的代碼是for循環(huán)去push的,肯定是fd不在服務(wù)上才會(huì)這樣的伟阔。這個(gè)完全就符合我們之前的設(shè)想辣之。那么我們?nèi)绾尾拍茏龅较嗷ナ盏叫畔⒛兀楷F(xiàn)在開始進(jìn)入我們的主題皱炉。

方案

  • Redis

  • MQ (以RabbitMQ為例)

Redis 主要的作用用于存取用戶與服務(wù)器的關(guān)系怀估,MQ 的話主要用于多個(gè)服務(wù)器的通訊和消息共享問題。

圖片.png
圖片.png

每個(gè)服務(wù)端都訂閱自己的 queue 進(jìn)行消費(fèi)合搅。比如:client1 想要發(fā)送消息給 client4, 這個(gè)時(shí)候的步驟如下:

  • client1 發(fā)送消息給 client4多搀,先從 Redis 中獲取 client4 的fd與服務(wù)器關(guān)系。

  • 拿到關(guān)系后灾部,直接push到 MQ中康铭,{"queueName":"queue2","toId":"1","fd":"1"...} 讓消費(fèi)者監(jiān)聽處理。

  • 消費(fèi)者監(jiān)聽某個(gè) queue 赌髓,進(jìn)行消費(fèi)處理从藤。

代碼

ws01.php

<?php
/***
 * User: gan
 * Date: 2019/11/2
 * Time: 9:21 上午
 */
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$host = "ws01";
$port = 9511;
$server = new Swoole\WebSocket\Server("0.0.0.0", $port);
$redis = new Redis;
$redis->connect("redis",6379);

$server->on('open', function (Swoole\WebSocket\Server $server, $request) {
    $server->push($request->fd, json_encode(["type"=>"open"]));
});

$server->on('message', function (Swoole\WebSocket\Server $server, $frame) use($redis,$host,$port) {

    $data = $frame->data;
    $data = json_decode($data,true);

    
    switch ($data["type"]) {
        case 'login':
            $redis->set($data["userName"],json_encode(["fd"=>$frame->fd,"host"=>$host,"port"=>$port,"userName"=>$data["userName"]]),600);
            $redis->set($host.':'.$frame->fd,$data["userName"]);
            $server->push($frame->fd, json_encode(["type"=>"message","content"=>"hello, welcome ".$data['userName']]));
            break;
        case 'sendto':
            // 接收者
            $to = $redis->get($data["to"]);
            $to = json_decode($to,true);
            
            // 發(fā)送者
            $from = $redis->get($data["from"]);
            $from = json_decode($from,true);

            // 首先自己推送一條
            if ($server->exist($frame->fd) && $server->isEstablished($frame->fd)) {
                $server->push($frame->fd, json_encode(["type"=>"message","content"=>$data["content"]]));
            }
            
            // 本服務(wù)催跪,不同人
            if($data["from"] != $data["to"] && $server->exist($to["fd"]) && $server->isEstablished($to["fd"])){
                $server->push($to["fd"], json_encode(["type"=>"message","content"=>$data["content"]]));
            }
            
            // 不在本服務(wù)推到mq
            if ($to["host"] != $host && $to && $from) {
                // 推送rabbitmq
                $connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin@1024');
                $channel = $connection->channel();
                $channel->exchange_declare('messages', 'topic', false, false, false);
                $msg = new AMQPMessage(json_encode(["to"=>$to["userName"],"content"=> $data["content"],"fd"=>$to["fd"],"host"=>$to["host"],"port"=>$to["port"]]));
                $channel->basic_publish($msg, 'messages', $to["host"]);
                $channel->close();
                $connection->close();
                break;
            }
            break;
        case 'message':
            // 發(fā)消息,主要用于其他的服務(wù)調(diào)用呛哟,mq的消費(fèi)者過來的時(shí)數(shù)據(jù)
            if ($server->exist($data["fd"]) && $server->isEstablished($data["fd"])) {
                $server->push($data["fd"], json_encode(["type"=>"message","content"=>$data["content"]])); 
            }
            break;
    }
    
});

$server->on('close', function ($ser, $fd) use($redis) {
    $userName = $redis->get($fd);
    $redis->del($userName);
    $redis->del($host.":".$fd);
    echo "client {$fd} closed\n";
});

$server->start();

ws02.php

<?php
/***
 * User: gan
 * Date: 2019/11/2
 * Time: 9:21 上午
 */
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$host = "ws02";
$port = 9512;
$server = new Swoole\WebSocket\Server("0.0.0.0", $port);
$redis = new Redis;
$redis->connect("redis",6379);

$server->on('open', function (Swoole\WebSocket\Server $server, $request) {
    $server->push($request->fd, json_encode(["type"=>"open"]));
});

$server->on('message', function (Swoole\WebSocket\Server $server, $frame) use($redis,$host,$port) {

    $data = $frame->data;
    $data = json_decode($data,true);

    
    switch ($data["type"]) {
        case 'login':
            $redis->set($data["userName"],json_encode(["fd"=>$frame->fd,"host"=>$host,"port"=>$port,"userName"=>$data["userName"]]),600);
            $redis->set($host.':'.$frame->fd,$data["userName"]);
            $server->push($frame->fd, json_encode(["type"=>"message","content"=>"hello, welcome ".$data['userName']]));
            break;
        case 'sendto':
            // 接收者
            $to = $redis->get($data["to"]);
            $to = json_decode($to,true);
            
            // 發(fā)送者
            $from = $redis->get($data["from"]);
            $from = json_decode($from,true);

            // 首先自己推送一條
            if ($server->exist($frame->fd) && $server->isEstablished($frame->fd)) {
                $server->push($frame->fd, json_encode(["type"=>"message","content"=>$data["content"]]));
            }
            
            // 本服務(wù)叠荠,不同人
            if($data["from"] != $data["to"] && $server->exist($to["fd"]) && $server->isEstablished($to["fd"])){
                $server->push($to["fd"], json_encode(["type"=>"message","content"=>$data["content"]]));
            }
            
            // 不在本服務(wù)推到mq
            if ($to["host"] != $host && $to && $from) {
                // 推送rabbitmq
                $connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin@1024');
                $channel = $connection->channel();
                $channel->exchange_declare('messages', 'topic', false, false, false);
                $msg = new AMQPMessage(json_encode(["to"=>$to["userName"],"content"=> $data["content"],"fd"=>$to["fd"],"host"=>$to["host"],"port"=>$to["port"]]));
                $channel->basic_publish($msg, 'messages', $to["host"]);
                $channel->close();
                $connection->close();
                break;
            }
            break;
        case 'message':
            // 發(fā)消息,主要用于其他的服務(wù)調(diào)用扫责,mq的消費(fèi)者過來的時(shí)數(shù)據(jù)
            if ($server->exist($data["fd"]) && $server->isEstablished($data["fd"])) {
                $server->push($data["fd"], json_encode(["type"=>"message","content"=>$data["content"]])); 
            }
            break;
    }
    
});

$server->on('close', function ($ser, $fd) use($redis) {
    $userName = $redis->get($fd);
    $redis->del($userName);
    $redis->del($host.":".$fd);
    echo "client {$fd} closed\n";
});

$server->start();

consumer.php

<?php
/***
 * User: gan
 * Date: 2019/11/2
 * Time: 11:50 上午
 */


require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin@1024');
$channel = $connection->channel();

$channel->exchange_declare('messages', 'topic', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$binding_keys = array_slice($argv, 1);
if (empty($binding_keys)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
    exit(1);
}

foreach ($binding_keys as $binding_key) {
    $channel->queue_bind($queue_name, 'messages', $binding_key);
}

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function ($msg) {
    echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
    // $msg->body
    $data  = json_decode( $msg->body,true);
    Co\run(function () use($data) {
        $client = new Swoole\Coroutine\Http\Client($data["host"], $data["port"]);
        $ret = $client->upgrade("/");
        if ($ret) {
            $arr = [
                "type"=>"message",
                "fd"=>$data["fd"],
                "content"=>$data["content"]
            ];
            $client->push(json_encode($arr));
        }
    });
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

index.html

<!DOCTYPE html>
<html>
<head>
    <title></title>
    <meta http-equiv="content-type" content="text/html;charset=utf-8">
    <style>
        p {
            text-align: left;
            padding-left: 20px;
        }
    </style>
</head>
<body>
<div style="width: 600px;height: 600px;margin: 30px auto;text-align: center">
   
    <div style="width: 600px;border: 1px solid gray;height: 300px;">
        <div id="msg_list" style="width:600px;height: 300px;overflow: scroll;float: left;">
        </div>
    </div>
    <br>
    <div style="width: 600px;height: 200px;text-align: left;">
        用戶名:<input type="text" name="touser" id="touser" placeholder="請(qǐng)輸入發(fā)送的用戶名">
        <br>
        內(nèi)容:<textarea id="msg_box" rows="6" cols="50"></textarea><br>
        <input type="button" value="發(fā)送" onclick="send()">
    </div>
</div>
</body>
</html>
<script type="text/javascript">
    var uname = prompt('請(qǐng)輸入用戶名', 'user' + uuid(8, 16));
    var ws = new WebSocket("ws://www.abc.com");
    ws.onopen = function () {
        var data = "系統(tǒng)消息:建立連接成功";
        listMsg(data);
    };

    ws.onmessage = function (e) {
        var msg = JSON.parse(e.data);
        var user_name, name_list, change_type;
        switch (msg.type) {
            case 'system':
                sender = '系統(tǒng)消息: ';
                break;
            case 'open':
                var user_info = {'type': 'login', 'userName': uname};
                sendMsg(user_info);
                return;
        }
        var data = msg.content;
        listMsg(data);
    };

    ws.onerror = function () {
        var data = "系統(tǒng)消息 : 出錯(cuò)了,請(qǐng)退出重試.";
        listMsg(data);
    };


    // 提交發(fā)送
    function send() {
        // 內(nèi)容
        var msg_box = document.getElementById("msg_box");
        var content = msg_box.value;
        // 用戶
        var touser = document.getElementById("touser");
        var touser = touser.value;

        var reg = new RegExp("\r\n", "g");
        content = content.replace(reg, "");
        var msg = {'content': content.trim(), 'type': 'sendto','from':uname,'to':touser.trim()};
        sendMsg(msg);
        msg_box.value = '';
    }

    // 消息列表
    function listMsg(data) {
        var msg_list = document.getElementById("msg_list");
        var msg = document.createElement("p");
        msg.innerHTML = data;
        msg_list.appendChild(msg);
        msg_list.scrollTop = msg_list.scrollHeight;
    }

    // 發(fā)送消息    
    function sendMsg(msg) {
        var data = JSON.stringify(msg);
        console.log(data)
        ws.send(data);
    }

    // 用戶uuid
    function uuid(len, radix) {
        var chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'.split('');
        var uuid = [], i;
        radix = radix || chars.length;
        if (len) {
            for (i = 0; i < len; i++) uuid[i] = chars[0 | Math.random() * radix];
        } else {
            var r;
            uuid[8] = uuid[13] = uuid[18] = uuid[23] = '-';
            uuid[14] = '4';
            for (i = 0; i < 36; i++) {
                if (!uuid[i]) {
                    r = 0 | Math.random() * 16;
                    uuid[i] = chars[(i == 19) ? (r & 0x3) | 0x8 : r];
                }
            }
        }
        return uuid.join('');
    }
</script>

運(yùn)行

php ws01.php #啟動(dòng)ws01
php ws02.php #啟動(dòng)ws02
php consumer.php "ws01"  #監(jiān)聽ws1 queue
php consumer.php "ws02"  #監(jiān)聽ws2 queue

效果

客戶1

圖片.png

客戶2

圖片.png

客戶3

圖片.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末榛鼎,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子鳖孤,更是在濱河造成了極大的恐慌者娱,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,406評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件苏揣,死亡現(xiàn)場(chǎng)離奇詭異黄鳍,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)平匈,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門框沟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人增炭,你說我怎么就攤上這事忍燥。” “怎么了隙姿?”我有些...
    開封第一講書人閱讀 163,711評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵梅垄,是天一觀的道長。 經(jīng)常有香客問我输玷,道長队丝,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,380評(píng)論 1 293
  • 正文 為了忘掉前任欲鹏,我火速辦了婚禮机久,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘赔嚎。我一直安慰自己吞加,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,432評(píng)論 6 392
  • 文/花漫 我一把揭開白布尽狠。 她就那樣靜靜地躺著衔憨,像睡著了一般。 火紅的嫁衣襯著肌膚如雪袄膏。 梳的紋絲不亂的頭發(fā)上践图,一...
    開封第一講書人閱讀 51,301評(píng)論 1 301
  • 那天,我揣著相機(jī)與錄音沉馆,去河邊找鬼码党。 笑死德崭,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的揖盘。 我是一名探鬼主播眉厨,決...
    沈念sama閱讀 40,145評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼兽狭!你這毒婦竟也來了憾股?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,008評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤箕慧,失蹤者是張志新(化名)和其女友劉穎服球,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體颠焦,經(jīng)...
    沈念sama閱讀 45,443評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡斩熊,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,649評(píng)論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了伐庭。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片粉渠。...
    茶點(diǎn)故事閱讀 39,795評(píng)論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖圾另,靈堂內(nèi)的尸體忽然破棺而出霸株,到底是詐尸還是另有隱情,我是刑警寧澤盯捌,帶...
    沈念sama閱讀 35,501評(píng)論 5 345
  • 正文 年R本政府宣布,位于F島的核電站蘑秽,受9級(jí)特大地震影響饺著,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜肠牲,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,119評(píng)論 3 328
  • 文/蒙蒙 一幼衰、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧缀雳,春花似錦渡嚣、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至深碱,卻和暖如春腹鹉,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背敷硅。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評(píng)論 1 269
  • 我被黑心中介騙來泰國打工功咒, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留愉阎,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,899評(píng)論 2 370
  • 正文 我出身青樓力奋,卻偏偏與公主長得像榜旦,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,724評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容