PHP網(wǎng)絡(luò)服務(wù)器模型 重啟worker 進(jìn)程
class Worker{
? ? //監(jiān)聽socket
? ? protected $socket = NULL;
? ? //連接事件回調(diào)
? ? public $onConnect = NULL;
? ? public? $reusePort=1;
? ? //接收消息事件回調(diào)
? ? public $onMessage = NULL;
? ? public $workerNum=3; //子進(jìn)程個數(shù)
? ? public? $allSocket; //存放所有socket
? ? public? $addr;
? ? protected $worker_pid; //子進(jìn)程pid
? ? protected? $master_pid;//主進(jìn)程id
? ? public function __construct($socket_address) {
? ? ? ? //監(jiān)聽地址+端口
? ? ? ? $this->addr=$socket_address;
? ? ? ? $this->master_pid=posix_getpid();
? ? }
? ? public function start() {
? ? ? ? //獲取配置文件
? ? ? ? $this->watch();
? ? ? ? $this->fork($this->workerNum);
? ? ? ? $this->monitorWorkers(); //監(jiān)視程序,捕獲信號,監(jiān)視worker進(jìn)程
? ? }
? ? /**
* 文件監(jiān)視,自動重啟
*/
? ? protected? function watch(){
? ? ? ? $init=inotify_init(); //初始化
? ? ? ? $files=get_included_files();
? ? ? ? foreach ($files as $file){
? ? ? ? ? ? inotify_add_watch($init,$file,IN_MODIFY); //監(jiān)視相關(guān)的文件
? ? ? ? }
? ? ? ? //監(jiān)聽
? ? ? ? swoole_event_add($init,function ($fd){
? ? ? ? ? ? $events=inotify_read($fd);
? ? ? ? ? ? if(!empty($events)){
? ? ? ? ? ? ? ? posix_kill($this->master_pid,SIGUSR1);
? ? ? ? ? ? }
? ? ? ? });
? ? }
? ? /**
* 捕獲信號
* 監(jiān)視worker進(jìn)程.拉起進(jìn)程
*/
? ? public? function monitorWorkers(){
? ? ? ? //注冊信號事件回調(diào),是不會自動執(zhí)行的
// reload
? ? ? ? pcntl_signal(SIGUSR1, array($this, 'signalHandler'),false); //重啟woker進(jìn)程信號
//ctrl+c
? ? ? ? $status=0;
? ? ? ? while (1){
? ? ? ? ? ? // 當(dāng)發(fā)現(xiàn)信號隊列,一旦發(fā)現(xiàn)有信號就會觸發(fā)進(jìn)程綁定事件回調(diào)
? ? ? ? ? ? pcntl_signal_dispatch();
? ? ? ? ? ? $pid = pcntl_wait($status); //當(dāng)信號到達(dá)之后就會被中斷
//如果進(jìn)程不是正常情況下的退出,重啟子進(jìn)程,我想要維持子進(jìn)程個數(shù)
//? ? ? ? ? ? if($pid>1 && $pid != $this->master_pid? && !pcntl_wifexited($status)){
//? ? ? ? ? ? ? ? ? ? $index=array_search($pid,$this->worker_pid);
//? ? ? ? ? ? ? ? ? ? $this->fork(1);
//? ? ? ? ? ? ? ? ? ? var_dump('拉起子進(jìn)程');
//? ? ? ? ? ? ? ? ? ? unset($this->worker_pid[$index]);
//? ? ? ? ? ? }
? ? ? ? ? ? pcntl_signal_dispatch();
? ? ? ? ? ? //進(jìn)程重啟的過程當(dāng)中會有新的信號過來,如果沒有調(diào)用pcntl_signal_dispatch,信號不會被處理
? ? ? ? }
}
? ? public function signalHandler($sigo){
? ? ? ? switch ($sigo){
? ? ? ? ? ? case SIGUSR1:
? ? ? ? ? ? ? ? $this->reload();
? ? ? ? ? ? ? ? echo "收到重啟信號";
? ? ? ? ? ? ? ? break;
? ? ? ? }
}
? ? public function fork($worker_num){
? ? ? ? for ($i=0;$i<$worker_num;$i++){
? ? ? ? ? ? $test=include 'index.php';
? ? ? ? ? ? var_dump($test);
? ? ? ? ? ? $pid=pcntl_fork(); //創(chuàng)建成功會返回子進(jìn)程id
? ? ? ? ? ? if($pid<0){
? ? ? ? ? ? ? ? exit('創(chuàng)建失敗');
? ? ? ? ? ? }else if($pid>0){
? ? ? ? ? ? ? ? //父進(jìn)程空間背率,返回子進(jìn)程id
? ? ? ? ? ? ? ? $this->worker_pid[]=$pid;
? ? ? ? ? ? }else{ //返回為0子進(jìn)程空間
? ? ? ? ? ? ? ? $this->accept();//子進(jìn)程負(fù)責(zé)接收客戶端請求
? ? ? ? ? ? ? ? exit;
? ? ? ? ? ? }
}
? ? ? ? //放在父進(jìn)程空間胧奔,結(jié)束的子進(jìn)程信息,阻塞狀態(tài)
? ? }
? ? public? function? accept(){
? ? ? ? $opts = array(
? ? ? ? ? ? 'socket' => array(
? ? ? ? ? ? ? ? 'backlog' =>10240, //成功建立socket連接的等待個數(shù)
? ? ? ? ? ? ),
? ? ? ? );
? ? ? ? $context = stream_context_create($opts);
? ? ? ? //開啟多端口監(jiān)聽,并且實現(xiàn)負(fù)載均衡
? ? ? ? stream_context_set_option($context,'socket','so_reuseport',1);
? ? ? ? stream_context_set_option($context,'socket','so_reuseaddr',1);
? ? ? ? $this->socket=stream_socket_server($this->addr,$errno,$errstr,STREAM_SERVER_BIND|STREAM_SERVER_LISTEN,$context);
? ? ? ? //第一個需要監(jiān)聽的事件(服務(wù)端socket的事件),一旦監(jiān)聽到可讀事件之后會觸發(fā)
? ? ? ? swoole_event_add($this->socket,function ($fd){
? ? ? ? ? ? $clientSocket=stream_socket_accept($fd);
? ? ? ? ? ? //觸發(fā)事件的連接的回調(diào)
? ? ? ? ? ? if(!empty($clientSocket) && is_callable($this->onConnect)){
? ? ? ? ? ? ? ? call_user_func($this->onConnect,$clientSocket);
? ? ? ? ? ? }
? ? ? ? ? ? //監(jiān)聽客戶端可讀
? ? ? ? ? ? swoole_event_add($clientSocket,function ($fd){
? ? ? ? ? ? ? ? //從連接當(dāng)中讀取客戶端的內(nèi)容
? ? ? ? ? ? ? ? $buffer=fread($fd,1024);
? ? ? ? ? ? ? ? //如果數(shù)據(jù)為空杭棵,或者為false,不是資源類型
? ? ? ? ? ? ? ? if(empty($buffer)){
? ? ? ? ? ? ? ? ? ? if(!is_resource($fd) || feof($fd) ){
? ? ? ? ? ? ? ? ? ? ? ? //觸發(fā)關(guān)閉事件
? ? ? ? ? ? ? ? ? ? ? ? fclose($fd);
? ? ? ? ? ? ? ? ? ? }
}
? ? ? ? ? ? ? ? //正常讀取到數(shù)據(jù),觸發(fā)消息接收事件,響應(yīng)內(nèi)容
? ? ? ? ? ? ? ? if(!empty($buffer) && is_callable($this->onMessage)){
? ? ? ? ? ? ? ? ? ? call_user_func($this->onMessage,$fd,$buffer);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? });
? ? ? ? });
? ? }
? ? /**
* 重啟worker進(jìn)程
*/
? ? public? function reload(){
? ? ? ? foreach ($this->worker_pid as $index=>$pid){
? ? ? ? ? ? posix_kill($pid,SIGKILL); //結(jié)束進(jìn)程
? ? ? ? ? ? var_dump("殺掉的子進(jìn)程",$pid);
? ? ? ? ? ? unset($this->worker_pid[$index]);
? ? ? ? ? ? $this->fork(1); //重新拉起worker
? ? ? ? }
}
? ? //捕獲信號之后重啟worker進(jìn)程
}
//ps -ef | grep php | grep -v grep | awk '{print $2}' | xargs kill -s 9
$worker = new Worker('tcp://0.0.0.0:9800');
//開啟多進(jìn)程的端口監(jiān)聽
$worker->reusePort = true;
//連接事件
$worker->onConnect = function ($fd) {
? ? //echo '連接事件觸發(fā)',(int)$fd,PHP_EOL;
};
$worker->onTask = function ($fd) {
? ? //echo '連接事件觸發(fā)',(int)$fd,PHP_EOL;
};
//消息接收
$worker->onMessage = function ($conn, $message) {
? ? //事件回調(diào)當(dāng)中寫業(yè)務(wù)邏輯
// $a=include 'index.php';
// var_dump($a);
//var_dump($conn,$message);
? ? $content="我是peter";
? ? $http_resonse = "HTTP/1.1 200 OK\r\n";
? ? $http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";
? ? $http_resonse .= "Connection: keep-alive\r\n"; //連接保持
? ? $http_resonse .= "Server: php socket server\r\n";
? ? $http_resonse .= "Content-length: ".strlen($content)."\r\n\r\n";
? ? $http_resonse .= $content;
? ? fwrite($conn, $http_resonse);
};
$worker->start(); //啟動