TP5 - PHP Resque Worker
安裝指南
命令安裝
- composer require chrisboulton/php-resque
運(yùn)行環(huán)境
- PHP 5.2+
- Redis 2.2+
- TP 5.0
應(yīng)用實(shí)例指南
添加入口文件
<?php
// +----------------------------------------------------------------------
// | ResQue執(zhí)行入口文件
// +----------------------------------------------------------------------
// | 作者: Melody
// +----------------------------------------------------------------------
// [ PHP版本檢查 ]
if (version_compare(PHP_VERSION, '5.5', '<')) {
die('PHP版本過(guò)低筋蓖,最少需要PHP5.5李破,請(qǐng)升級(jí)PHP版本!');
}
fwrite(STDOUT, "Start init");
set_time_limit(0);
// [ 應(yīng)用入口文件 ]
// 定義應(yīng)用目錄
define('APP_PATH', __DIR__ . '/application/');
define('MODE_NAME', 'cli'); // 自定義cli模式
// 處理自定義參數(shù)
$act = isset($argv[2]) ? $argv[2] : 'start';
putenv("Q_ACTION={$act}");
putenv("Q_ARGV=" . json_encode($argv));
// 加載框架引導(dǎo)文件
require './thinkphp/start.php';
創(chuàng)建Worker類(lèi)
<?php
/**
* Created by PhpStorm.
* User: melody
* Date: 2019-01-10
* Time: 11:36
*/
namespace app\cisdi\home;
use app\common\queue;
use think\Controller;
class MsgQueue extends Controller
{
protected $args = [];
protected $keys = [];
protected $queues = '*';
public function __construct()
{
queue\Config::init();
if (!IS_CLI) die('The file can only be run in cli mode!');
fwrite(STDOUT, getenv('Q_ARGV'));
$argv = json_decode(getenv('Q_ARGV'));
foreach ($argv as $item) {
if (strpos($item, '=')) {
list($key, $val) = explode('=', $item);
} else {
$key = $val = $item;
}
$this->keys[] = $key;
$this->args[$key] = $val;
}
$this->init();
}
/**
* 執(zhí)行隊(duì)列
* 環(huán)境變量參數(shù)值:
* --queue|QUEUE: 需要執(zhí)行的隊(duì)列的名字
* --interval|INTERVAL:在隊(duì)列中循環(huán)的間隔時(shí)間,即完成一個(gè)任務(wù)后的等待時(shí)間航揉,默認(rèn)是5秒
* --app|APP_INCLUDE:需要自動(dòng)載入PHP文件路徑,Worker需要知道你的Job的位置并載入Job
* --count|COUNT:需要?jiǎng)?chuàng)建的Worker的數(shù)量弛作。所有的Worker都具有相同的屬性举娩。默認(rèn)是創(chuàng)建1個(gè)Worker
* --debug|VVERBOSE:設(shè)置“1”啟用更啰嗦模式,會(huì)輸出詳細(xì)的調(diào)試信息
* --pid|PIDFILE:手動(dòng)指定PID文件的位置礁哄,適用于單Worker運(yùn)行方式
*/
private function init()
{
$is_sington = false; //是否單例運(yùn)行长酗,單例運(yùn)行會(huì)在tmp目錄下建立一個(gè)唯一的PID
// 根據(jù)參數(shù)設(shè)置QUEUE環(huán)境變量
$QUEUE = in_array('--queue', $this->keys) ? $this->args['--queue'] : '*';
if (empty($QUEUE)) {
die("Set QUEUE env var containing the list of queues to work.\n");
}
$this->queues = explode(',', $QUEUE);
// 根據(jù)參數(shù)設(shè)置INTERVAL環(huán)境變量
$interval = in_array('--interval', $this->keys) ? $this->args['--interval'] : 5;
putenv("INTERVAL={$interval}");
// 根據(jù)參數(shù)設(shè)置COUNT環(huán)境變量
$count = in_array('--count', $this->keys) ? $this->args['--count'] : 1;
putenv("COUNT={$count}");
// 根據(jù)參數(shù)設(shè)置APP_INCLUDE環(huán)境變量
$app = in_array('--app', $this->keys) ? $this->args['--app'] : '';
putenv("APP_INCLUDE={$app}");
// 根據(jù)參數(shù)設(shè)置PIDFILE環(huán)境變量
$pid = in_array('--pid', $this->keys) ? $this->args['--pid'] : '';
putenv("PIDFILE={$pid}");
// 根據(jù)參數(shù)設(shè)置VVERBOSE環(huán)境變量
$debug = in_array('--debug', $this->keys) ? $this->args['--debug'] : '';
putenv("VVERBOSE={$debug}");
}
public function index()
{
fwrite(STDOUT, "Start index ");
$act = getenv('Q_ACTION');
switch ($act) {
case 'stop':
$this->stop();
break;
case 'status':
$this->status();
break;
default:
$this->start();
}
}
/**
* 開(kāi)始隊(duì)列
*/
public function start()
{
fwrite(STDOUT, "開(kāi)始worker" . "\n");
// 載入任務(wù)類(lèi)
$path = JOB_PATH;
fwrite(STDOUT, "地址". $path . "\n");
$flag = \FilesystemIterator::KEY_AS_FILENAME;
$glob = new \FilesystemIterator($path, $flag);
foreach ($glob as $file) {
if('php' === pathinfo($file, PATHINFO_EXTENSION))
require realpath($file);
}
$logLevel = 0;
$LOGGING = getenv('LOGGING');
$VERBOSE = getenv('VERBOSE');
$VVERBOSE = getenv('VVERBOSE');
if (!empty($LOGGING) || !empty($VERBOSE)) {
$logLevel = \Resque_Worker::LOG_NORMAL;
} else {
if (!empty($VVERBOSE)) {
$logLevel = \Resque_Worker::LOG_VERBOSE;
}
}
$APP_INCLUDE = getenv('APP_INCLUDE');
if ($APP_INCLUDE) {
if (!file_exists($APP_INCLUDE)) {
die('APP_INCLUDE (' . $APP_INCLUDE . ") does not exist.\n");
}
require_once $APP_INCLUDE;
}
$interval = 5;
$INTERVAL = getenv('INTERVAL');
if (!empty($INTERVAL)) {
$interval = $INTERVAL;
}
$count = 1;
$COUNT = getenv('COUNT');
if (!empty($COUNT) && $COUNT > 1) {
$count = $COUNT;
}
if ($count > 1) {
for ($i = 0; $i < $count; ++$i) {
$pid = pcntl_fork();
if ($pid == -1) {
die("Could not fork worker " . $i . "\n");
} // Child, start the worker
else {
if (!$pid) {
$worker = new \Resque_Worker($this->queues);
$worker->logLevel = $logLevel;
fwrite(STDOUT, '*** Starting worker ' . $worker . "\n");
$worker->work($interval);
break;
}
}
}
} // Start a single worker
else {
$worker = new \Resque_Worker($this->queues);
$worker->logLevel = $logLevel;
$PIDFILE = getenv('PIDFILE');
if ($PIDFILE) {
file_put_contents($PIDFILE, getmypid()) or
die('Could not write PID information to ' . $PIDFILE);
}
fwrite(STDOUT, '*** Starting worker ' . $worker . "\n");
$worker->work($interval);
}
}
/**
* 停止隊(duì)列
*/
public function stop()
{
$worker = new \Resque_Worker($this->queues);
$worker->shutdown();
}
/**
* 查看某個(gè)任務(wù)狀態(tài)
*/
public function status()
{
$id = in_array('--id', $this->keys) ? $this->args['--id'] : '';
$status = new \Resque_Job_Status($id);
if (!$status->isTracking()) {
die("Resque is not tracking the status of this job.\n");
}
echo "Tracking status of " . $id . ". Press [break] to stop.\n\n";
while (true) {
fwrite(STDOUT, "Status of " . $id . " is: " . $status->get() . "\n");
sleep(1);
}
}
}
創(chuàng)建Job處理類(lèi)
示例創(chuàng)建的類(lèi)為job/ComJob
<?php
/**
* Created by PhpStorm.
* User: melody
*/
namespace app\cisdi\job;
class ComJob
{
/**
* 任務(wù)執(zhí)行函數(shù)
*/
public function perform()
{
$args = $this->args;
fwrite(STDOUT, json_encode($args) . '已處理完此信息' . PHP_EOL);
}
/**
* perform方法之前調(diào)用
*/
public function setUp()
{
// ... Set up environment for this job
}
/**
* perform方法之后調(diào)用
*/
public function tearDown()
{
// ... Remove environment for this job
}
}
添加任務(wù)示例
/**
* 測(cè)試添加消息隊(duì)列
*/
public function testEnQueue(){
$job = '\\app\\cisdi\\job\\ComJob'; // 定義任務(wù)類(lèi)
$args = array(
'time' => time(),
'array' => array(
'test' => 'test',
),
);
$jobId = \Resque::enqueue('default', $job, $args, true);
echo "Queued job ".$jobId."\n\n";
}
應(yīng)用案例
- 創(chuàng)建任務(wù)
執(zhí)行testEnQueue方法(可通過(guò)cli或者web端執(zhí)行)
得到的返回信息:
Queued job fc7632470b1d7e69aede1675d79bdfe8
- 開(kāi)啟任務(wù)處理器
root@iZwz980sm1dapjhje7e6icZ:/home/wwwroot/default/wx_test# php resque.php cisdi/msg_queue/index start
Start init應(yīng)用初始化開(kāi)始
["resque.php","cisdi\/msg_queue\/index","start"]Start index 開(kāi)始worker
地址/home/wwwroot/default/wx_test//application/cisdi/job/
*** Starting worker iZwz980sm1dapjhje7e6icZ:32471:*
{"time":1547713088,"array":{"test":"test"}}已處理完此信息
讓?xiě)?yīng)用長(zhǎng)駐內(nèi)存
- 安裝supervisor
apt-get install supervisor
- 添加應(yīng)用到supervisor
[program:resque]
command = php resque cisdi/msg_queue/index start &
numprocs=1
directory= 你的當(dāng)前目錄
stderr_logfile_maxbytes=10MB
stdout_logfile=/var/log/supervisor/%(program_name)s-stdout.log
stderr_logfile=/var/log/supervisor/%(program_name)s-stderr.log
redirect_stderr=true
autostart=true
autorestart=true
- 重啟supervisor讓配置文件生效
supervisorctl reload
- 啟動(dòng)應(yīng)用進(jìn)程
supervisorctl start