前景
我們?cè)谧鼍W(wǎng)頁聊天的話击狮,基本上都是用到websocket去做。如果單臺(tái)服務(wù)器難以支撐的話益老,我們就會(huì)考慮加機(jī)器彪蓬,做成分布式,做成集群模式捺萌。這個(gè)時(shí)候就會(huì)出現(xiàn)關(guān)于分布式 session
的問題档冬,也就是說多臺(tái)fd
連接問題。
單機(jī)架構(gòu)
從上面的箭頭我們就可以看出桃纯,雙向是可以通信的酷誓。可以來回發(fā)送信息的态坦。
分布式架構(gòu)
假如
client1
想要單獨(dú)發(fā)送私聊的信息給client4
呛牲,這個(gè)時(shí)候怎么辦?假如
client1
想要發(fā)送群聊給所有的客戶端驮配,這個(gè)時(shí)候怎么辦?假如 后臺(tái)管理人家着茸,先要發(fā)送系統(tǒng)消息廣播給所有人壮锻。這個(gè)時(shí)候怎么辦?
模擬
nginx.conf
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
upstream websocket {
server 127.0.0.1:9511;
server 127.0.0.1:9522;
}
server {
listen 80;
server_name www.abc.com;
location / {
proxy_pass http://websocket;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
}
}
ws1.php
<?php
/***
* User: gan
* Date: 2019/11/2
* Time: 9:21 上午
*/
$server = new Swoole\WebSocket\Server("0.0.0.0", 9511);
$server->on('open', function (Swoole\WebSocket\Server $server, $request) {
echo "server: handshake success with fd{$request->fd}\n";
$server->push($request->fd, "hello, welcome\n");
});
$server->on('message', function (Swoole\WebSocket\Server $server, $frame) {
echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
foreach ($server->connections as $fd) {
// 需要先判斷是否是正確的websocket連接涮阔,否則有可能會(huì)push失敗
if ($server->isEstablished($fd)) {
$server->push($fd, $frame->data);
}
}
});
$server->on('close', function ($ser, $fd) {
echo "client {$fd} closed\n";
});
$server->start();
ws2.php
<?php
/***
* User: gan
* Date: 2019/11/2
* Time: 9:21 上午
*/
$server = new Swoole\WebSocket\Server("0.0.0.0", 9522);
$server->on('open', function (Swoole\WebSocket\Server $server, $request) {
echo "server: handshake success with fd{$request->fd}\n";
$server->push($request->fd, "hello, welcome\n");
});
$server->on('message', function (Swoole\WebSocket\Server $server, $frame) {
echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
foreach ($server->connections as $fd) {
// 需要先判斷是否是正確的websocket連接猜绣,否則有可能會(huì)push失敗
if ($server->isEstablished($fd)) {
$server->push($fd, $frame->data);
}
}
});
$server->on('close', function ($ser, $fd) {
echo "client {$fd} closed\n";
});
$server->start();
websocket測(cè)試工具
二個(gè)客戶端連接,都發(fā)送了消息敬特,但是都收不到彼此的消息掰邢,我們上面的代碼是for循環(huán)去push的,肯定是fd不在服務(wù)上才會(huì)這樣的伟阔。這個(gè)完全就符合我們之前的設(shè)想辣之。那么我們?nèi)绾尾拍茏龅较嗷ナ盏叫畔⒛兀楷F(xiàn)在開始進(jìn)入我們的主題皱炉。
方案
Redis
MQ (以RabbitMQ為例)
Redis
主要的作用用于存取用戶與服務(wù)器的關(guān)系怀估,MQ
的話主要用于多個(gè)服務(wù)器的通訊和消息共享問題。
每個(gè)服務(wù)端都訂閱自己的 queue
進(jìn)行消費(fèi)合搅。比如:client1
想要發(fā)送消息給 client4
, 這個(gè)時(shí)候的步驟如下:
client1
發(fā)送消息給client4
多搀,先從Redis
中獲取client4
的fd與服務(wù)器關(guān)系。拿到關(guān)系后灾部,直接push到 MQ中康铭,
{"queueName":"queue2","toId":"1","fd":"1"...}
讓消費(fèi)者監(jiān)聽處理。消費(fèi)者監(jiān)聽某個(gè)
queue
赌髓,進(jìn)行消費(fèi)處理从藤。
代碼
ws01.php
<?php
/***
* User: gan
* Date: 2019/11/2
* Time: 9:21 上午
*/
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$host = "ws01";
$port = 9511;
$server = new Swoole\WebSocket\Server("0.0.0.0", $port);
$redis = new Redis;
$redis->connect("redis",6379);
$server->on('open', function (Swoole\WebSocket\Server $server, $request) {
$server->push($request->fd, json_encode(["type"=>"open"]));
});
$server->on('message', function (Swoole\WebSocket\Server $server, $frame) use($redis,$host,$port) {
$data = $frame->data;
$data = json_decode($data,true);
switch ($data["type"]) {
case 'login':
$redis->set($data["userName"],json_encode(["fd"=>$frame->fd,"host"=>$host,"port"=>$port,"userName"=>$data["userName"]]),600);
$redis->set($host.':'.$frame->fd,$data["userName"]);
$server->push($frame->fd, json_encode(["type"=>"message","content"=>"hello, welcome ".$data['userName']]));
break;
case 'sendto':
// 接收者
$to = $redis->get($data["to"]);
$to = json_decode($to,true);
// 發(fā)送者
$from = $redis->get($data["from"]);
$from = json_decode($from,true);
// 首先自己推送一條
if ($server->exist($frame->fd) && $server->isEstablished($frame->fd)) {
$server->push($frame->fd, json_encode(["type"=>"message","content"=>$data["content"]]));
}
// 本服務(wù)催跪,不同人
if($data["from"] != $data["to"] && $server->exist($to["fd"]) && $server->isEstablished($to["fd"])){
$server->push($to["fd"], json_encode(["type"=>"message","content"=>$data["content"]]));
}
// 不在本服務(wù)推到mq
if ($to["host"] != $host && $to && $from) {
// 推送rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin@1024');
$channel = $connection->channel();
$channel->exchange_declare('messages', 'topic', false, false, false);
$msg = new AMQPMessage(json_encode(["to"=>$to["userName"],"content"=> $data["content"],"fd"=>$to["fd"],"host"=>$to["host"],"port"=>$to["port"]]));
$channel->basic_publish($msg, 'messages', $to["host"]);
$channel->close();
$connection->close();
break;
}
break;
case 'message':
// 發(fā)消息,主要用于其他的服務(wù)調(diào)用呛哟,mq的消費(fèi)者過來的時(shí)數(shù)據(jù)
if ($server->exist($data["fd"]) && $server->isEstablished($data["fd"])) {
$server->push($data["fd"], json_encode(["type"=>"message","content"=>$data["content"]]));
}
break;
}
});
$server->on('close', function ($ser, $fd) use($redis) {
$userName = $redis->get($fd);
$redis->del($userName);
$redis->del($host.":".$fd);
echo "client {$fd} closed\n";
});
$server->start();
ws02.php
<?php
/***
* User: gan
* Date: 2019/11/2
* Time: 9:21 上午
*/
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$host = "ws02";
$port = 9512;
$server = new Swoole\WebSocket\Server("0.0.0.0", $port);
$redis = new Redis;
$redis->connect("redis",6379);
$server->on('open', function (Swoole\WebSocket\Server $server, $request) {
$server->push($request->fd, json_encode(["type"=>"open"]));
});
$server->on('message', function (Swoole\WebSocket\Server $server, $frame) use($redis,$host,$port) {
$data = $frame->data;
$data = json_decode($data,true);
switch ($data["type"]) {
case 'login':
$redis->set($data["userName"],json_encode(["fd"=>$frame->fd,"host"=>$host,"port"=>$port,"userName"=>$data["userName"]]),600);
$redis->set($host.':'.$frame->fd,$data["userName"]);
$server->push($frame->fd, json_encode(["type"=>"message","content"=>"hello, welcome ".$data['userName']]));
break;
case 'sendto':
// 接收者
$to = $redis->get($data["to"]);
$to = json_decode($to,true);
// 發(fā)送者
$from = $redis->get($data["from"]);
$from = json_decode($from,true);
// 首先自己推送一條
if ($server->exist($frame->fd) && $server->isEstablished($frame->fd)) {
$server->push($frame->fd, json_encode(["type"=>"message","content"=>$data["content"]]));
}
// 本服務(wù)叠荠,不同人
if($data["from"] != $data["to"] && $server->exist($to["fd"]) && $server->isEstablished($to["fd"])){
$server->push($to["fd"], json_encode(["type"=>"message","content"=>$data["content"]]));
}
// 不在本服務(wù)推到mq
if ($to["host"] != $host && $to && $from) {
// 推送rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin@1024');
$channel = $connection->channel();
$channel->exchange_declare('messages', 'topic', false, false, false);
$msg = new AMQPMessage(json_encode(["to"=>$to["userName"],"content"=> $data["content"],"fd"=>$to["fd"],"host"=>$to["host"],"port"=>$to["port"]]));
$channel->basic_publish($msg, 'messages', $to["host"]);
$channel->close();
$connection->close();
break;
}
break;
case 'message':
// 發(fā)消息,主要用于其他的服務(wù)調(diào)用扫责,mq的消費(fèi)者過來的時(shí)數(shù)據(jù)
if ($server->exist($data["fd"]) && $server->isEstablished($data["fd"])) {
$server->push($data["fd"], json_encode(["type"=>"message","content"=>$data["content"]]));
}
break;
}
});
$server->on('close', function ($ser, $fd) use($redis) {
$userName = $redis->get($fd);
$redis->del($userName);
$redis->del($host.":".$fd);
echo "client {$fd} closed\n";
});
$server->start();
consumer.php
<?php
/***
* User: gan
* Date: 2019/11/2
* Time: 11:50 上午
*/
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin@1024');
$channel = $connection->channel();
$channel->exchange_declare('messages', 'topic', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$binding_keys = array_slice($argv, 1);
if (empty($binding_keys)) {
file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
exit(1);
}
foreach ($binding_keys as $binding_key) {
$channel->queue_bind($queue_name, 'messages', $binding_key);
}
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
$callback = function ($msg) {
echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
// $msg->body
$data = json_decode( $msg->body,true);
Co\run(function () use($data) {
$client = new Swoole\Coroutine\Http\Client($data["host"], $data["port"]);
$ret = $client->upgrade("/");
if ($ret) {
$arr = [
"type"=>"message",
"fd"=>$data["fd"],
"content"=>$data["content"]
];
$client->push(json_encode($arr));
}
});
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
index.html
<!DOCTYPE html>
<html>
<head>
<title></title>
<meta http-equiv="content-type" content="text/html;charset=utf-8">
<style>
p {
text-align: left;
padding-left: 20px;
}
</style>
</head>
<body>
<div style="width: 600px;height: 600px;margin: 30px auto;text-align: center">
<div style="width: 600px;border: 1px solid gray;height: 300px;">
<div id="msg_list" style="width:600px;height: 300px;overflow: scroll;float: left;">
</div>
</div>
<br>
<div style="width: 600px;height: 200px;text-align: left;">
用戶名:<input type="text" name="touser" id="touser" placeholder="請(qǐng)輸入發(fā)送的用戶名">
<br>
內(nèi)容:<textarea id="msg_box" rows="6" cols="50"></textarea><br>
<input type="button" value="發(fā)送" onclick="send()">
</div>
</div>
</body>
</html>
<script type="text/javascript">
var uname = prompt('請(qǐng)輸入用戶名', 'user' + uuid(8, 16));
var ws = new WebSocket("ws://www.abc.com");
ws.onopen = function () {
var data = "系統(tǒng)消息:建立連接成功";
listMsg(data);
};
ws.onmessage = function (e) {
var msg = JSON.parse(e.data);
var user_name, name_list, change_type;
switch (msg.type) {
case 'system':
sender = '系統(tǒng)消息: ';
break;
case 'open':
var user_info = {'type': 'login', 'userName': uname};
sendMsg(user_info);
return;
}
var data = msg.content;
listMsg(data);
};
ws.onerror = function () {
var data = "系統(tǒng)消息 : 出錯(cuò)了,請(qǐng)退出重試.";
listMsg(data);
};
// 提交發(fā)送
function send() {
// 內(nèi)容
var msg_box = document.getElementById("msg_box");
var content = msg_box.value;
// 用戶
var touser = document.getElementById("touser");
var touser = touser.value;
var reg = new RegExp("\r\n", "g");
content = content.replace(reg, "");
var msg = {'content': content.trim(), 'type': 'sendto','from':uname,'to':touser.trim()};
sendMsg(msg);
msg_box.value = '';
}
// 消息列表
function listMsg(data) {
var msg_list = document.getElementById("msg_list");
var msg = document.createElement("p");
msg.innerHTML = data;
msg_list.appendChild(msg);
msg_list.scrollTop = msg_list.scrollHeight;
}
// 發(fā)送消息
function sendMsg(msg) {
var data = JSON.stringify(msg);
console.log(data)
ws.send(data);
}
// 用戶uuid
function uuid(len, radix) {
var chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'.split('');
var uuid = [], i;
radix = radix || chars.length;
if (len) {
for (i = 0; i < len; i++) uuid[i] = chars[0 | Math.random() * radix];
} else {
var r;
uuid[8] = uuid[13] = uuid[18] = uuid[23] = '-';
uuid[14] = '4';
for (i = 0; i < 36; i++) {
if (!uuid[i]) {
r = 0 | Math.random() * 16;
uuid[i] = chars[(i == 19) ? (r & 0x3) | 0x8 : r];
}
}
}
return uuid.join('');
}
</script>
運(yùn)行
php ws01.php #啟動(dòng)ws01
php ws02.php #啟動(dòng)ws02
php consumer.php "ws01" #監(jiān)聽ws1 queue
php consumer.php "ws02" #監(jiān)聽ws2 queue