ubuntu16.04 消息隊(duì)列 TP5 - PHP Resque Worker

TP5 - PHP Resque Worker

安裝指南

命令安裝

  • composer require chrisboulton/php-resque

運(yùn)行環(huán)境

  • PHP 5.2+
  • Redis 2.2+
  • TP 5.0

應(yīng)用實(shí)例指南

添加入口文件

<?php
// +----------------------------------------------------------------------
// | ResQue執(zhí)行入口文件
// +----------------------------------------------------------------------
// | 作者: Melody
// +----------------------------------------------------------------------

// [ PHP版本檢查 ]
if (version_compare(PHP_VERSION, '5.5', '<')) {
    die('PHP版本過(guò)低筋蓖,最少需要PHP5.5李破,請(qǐng)升級(jí)PHP版本!');
}
        fwrite(STDOUT, "Start init");
set_time_limit(0);
// [ 應(yīng)用入口文件 ]

// 定義應(yīng)用目錄
define('APP_PATH', __DIR__ . '/application/');
define('MODE_NAME', 'cli');     // 自定義cli模式
// 處理自定義參數(shù)
$act = isset($argv[2]) ? $argv[2] : 'start';
putenv("Q_ACTION={$act}");
putenv("Q_ARGV=" . json_encode($argv));
// 加載框架引導(dǎo)文件
require './thinkphp/start.php';

創(chuàng)建Worker類(lèi)

<?php
/**
 * Created by PhpStorm.
 * User: melody
 * Date: 2019-01-10
 * Time: 11:36
 */

namespace app\cisdi\home;
use app\common\queue;
use think\Controller;

class MsgQueue extends Controller
{
    protected $args = [];
    protected $keys = [];
    protected $queues = '*';
    public function __construct()
    {
        queue\Config::init();
        if (!IS_CLI)  die('The file can only be run in cli mode!');
        fwrite(STDOUT, getenv('Q_ARGV'));
        $argv = json_decode(getenv('Q_ARGV'));
        foreach ($argv as $item) {
            if (strpos($item, '=')) {
                list($key, $val) = explode('=', $item);
            } else {
                $key = $val = $item;
            }
            $this->keys[] = $key;
            $this->args[$key] = $val;
        }

        $this->init();
    }

    /**
     * 執(zhí)行隊(duì)列
     * 環(huán)境變量參數(shù)值:
     * --queue|QUEUE: 需要執(zhí)行的隊(duì)列的名字
     * --interval|INTERVAL:在隊(duì)列中循環(huán)的間隔時(shí)間,即完成一個(gè)任務(wù)后的等待時(shí)間航揉,默認(rèn)是5秒
     * --app|APP_INCLUDE:需要自動(dòng)載入PHP文件路徑,Worker需要知道你的Job的位置并載入Job
     * --count|COUNT:需要?jiǎng)?chuàng)建的Worker的數(shù)量弛作。所有的Worker都具有相同的屬性举娩。默認(rèn)是創(chuàng)建1個(gè)Worker
     * --debug|VVERBOSE:設(shè)置“1”啟用更啰嗦模式,會(huì)輸出詳細(xì)的調(diào)試信息
     * --pid|PIDFILE:手動(dòng)指定PID文件的位置礁哄,適用于單Worker運(yùn)行方式
     */
    private function init()
    {
        $is_sington = false; //是否單例運(yùn)行长酗,單例運(yùn)行會(huì)在tmp目錄下建立一個(gè)唯一的PID

        // 根據(jù)參數(shù)設(shè)置QUEUE環(huán)境變量
        $QUEUE = in_array('--queue', $this->keys) ? $this->args['--queue'] : '*';
        if (empty($QUEUE)) {
            die("Set QUEUE env var containing the list of queues to work.\n");
        }
        $this->queues = explode(',', $QUEUE);

        // 根據(jù)參數(shù)設(shè)置INTERVAL環(huán)境變量
        $interval = in_array('--interval', $this->keys) ? $this->args['--interval'] : 5;
        putenv("INTERVAL={$interval}");

        // 根據(jù)參數(shù)設(shè)置COUNT環(huán)境變量
        $count = in_array('--count', $this->keys) ? $this->args['--count'] : 1;
        putenv("COUNT={$count}");

        // 根據(jù)參數(shù)設(shè)置APP_INCLUDE環(huán)境變量
        $app = in_array('--app', $this->keys) ? $this->args['--app'] : '';
        putenv("APP_INCLUDE={$app}");

        // 根據(jù)參數(shù)設(shè)置PIDFILE環(huán)境變量
        $pid = in_array('--pid', $this->keys) ? $this->args['--pid'] : '';
        putenv("PIDFILE={$pid}");

        // 根據(jù)參數(shù)設(shè)置VVERBOSE環(huán)境變量
        $debug = in_array('--debug', $this->keys) ? $this->args['--debug'] : '';
        putenv("VVERBOSE={$debug}");
    }

    public function index()
    {
        fwrite(STDOUT, "Start index ");
        $act = getenv('Q_ACTION');
        switch ($act) {
            case 'stop':
                $this->stop();
                break;
            case 'status':
                $this->status();
                break;
            default:
                $this->start();
        }
    }

    /**
     * 開(kāi)始隊(duì)列
     */
    public function start()
    {
        fwrite(STDOUT, "開(kāi)始worker" . "\n");
        // 載入任務(wù)類(lèi)
        $path = JOB_PATH;
        fwrite(STDOUT, "地址". $path . "\n");
        $flag = \FilesystemIterator::KEY_AS_FILENAME;
        $glob = new \FilesystemIterator($path, $flag);
        foreach ($glob as $file) {
            if('php' === pathinfo($file, PATHINFO_EXTENSION))
                require realpath($file);
        }

        $logLevel = 0;
        $LOGGING = getenv('LOGGING');
        $VERBOSE = getenv('VERBOSE');
        $VVERBOSE = getenv('VVERBOSE');
        if (!empty($LOGGING) || !empty($VERBOSE)) {
            $logLevel = \Resque_Worker::LOG_NORMAL;
        } else {
            if (!empty($VVERBOSE)) {
                $logLevel = \Resque_Worker::LOG_VERBOSE;
            }
        }

        $APP_INCLUDE = getenv('APP_INCLUDE');
        if ($APP_INCLUDE) {
            if (!file_exists($APP_INCLUDE)) {
                die('APP_INCLUDE (' . $APP_INCLUDE . ") does not exist.\n");
            }
            require_once $APP_INCLUDE;
        }

        $interval = 5;
        $INTERVAL = getenv('INTERVAL');
        if (!empty($INTERVAL)) {
            $interval = $INTERVAL;
        }

        $count = 1;
        $COUNT = getenv('COUNT');
        if (!empty($COUNT) && $COUNT > 1) {
            $count = $COUNT;
        }

        if ($count > 1) {
            for ($i = 0; $i < $count; ++$i) {
                $pid = pcntl_fork();
                if ($pid == -1) {
                    die("Could not fork worker " . $i . "\n");
                } // Child, start the worker
                else {
                    if (!$pid) {
                        $worker = new \Resque_Worker($this->queues);
                        $worker->logLevel = $logLevel;
                        fwrite(STDOUT, '*** Starting worker ' . $worker . "\n");
                        $worker->work($interval);
                        break;
                    }
                }
            }
        } // Start a single worker
        else {
            $worker = new \Resque_Worker($this->queues);
            $worker->logLevel = $logLevel;

            $PIDFILE = getenv('PIDFILE');
            if ($PIDFILE) {
                file_put_contents($PIDFILE, getmypid()) or
                die('Could not write PID information to ' . $PIDFILE);
            }

            fwrite(STDOUT, '*** Starting worker ' . $worker . "\n");
            $worker->work($interval);
        }
    }

    /**
     * 停止隊(duì)列
     */
    public function stop()
    {
        $worker = new \Resque_Worker($this->queues);
        $worker->shutdown();
    }

    /**
     * 查看某個(gè)任務(wù)狀態(tài)
     */
    public function status()
    {
        $id = in_array('--id', $this->keys) ? $this->args['--id'] : '';
        $status = new \Resque_Job_Status($id);
        if (!$status->isTracking()) {
            die("Resque is not tracking the status of this job.\n");
        }

        echo "Tracking status of " . $id . ". Press [break] to stop.\n\n";
        while (true) {
            fwrite(STDOUT, "Status of " . $id . " is: " . $status->get() . "\n");
            sleep(1);
        }
    }
}

創(chuàng)建Job處理類(lèi)

示例創(chuàng)建的類(lèi)為job/ComJob

<?php
/**
 * Created by PhpStorm.
 * User: melody
     */

namespace app\cisdi\job;


class ComJob
{
    /**
     * 任務(wù)執(zhí)行函數(shù)
     */
    public function perform()
    {
        $args = $this->args;
        fwrite(STDOUT, json_encode($args) . '已處理完此信息' . PHP_EOL);
    }

    /**
     * perform方法之前調(diào)用
     */
    public function setUp()
    {
        // ... Set up environment for this job
    }

    /**
     * perform方法之后調(diào)用
     */
    public function tearDown()
    {
        // ... Remove environment for this job
    }
}

添加任務(wù)示例

    /**
     * 測(cè)試添加消息隊(duì)列
     */
    public function testEnQueue(){
        $job = '\\app\\cisdi\\job\\ComJob'; // 定義任務(wù)類(lèi)
        $args = array(
            'time' => time(),
            'array' => array(
                'test' => 'test',
            ),
        );
        $jobId = \Resque::enqueue('default', $job, $args, true);
        echo "Queued job ".$jobId."\n\n";
    }

應(yīng)用案例

  • 創(chuàng)建任務(wù)
執(zhí)行testEnQueue方法(可通過(guò)cli或者web端執(zhí)行)
得到的返回信息:
Queued job fc7632470b1d7e69aede1675d79bdfe8
  • 開(kāi)啟任務(wù)處理器
root@iZwz980sm1dapjhje7e6icZ:/home/wwwroot/default/wx_test# php resque.php cisdi/msg_queue/index start
Start init應(yīng)用初始化開(kāi)始
["resque.php","cisdi\/msg_queue\/index","start"]Start index 開(kāi)始worker
地址/home/wwwroot/default/wx_test//application/cisdi/job/
*** Starting worker iZwz980sm1dapjhje7e6icZ:32471:*
{"time":1547713088,"array":{"test":"test"}}已處理完此信息

讓?xiě)?yīng)用長(zhǎng)駐內(nèi)存

  • 安裝supervisor
apt-get install supervisor
  • 添加應(yīng)用到supervisor
[program:resque]
command = php resque cisdi/msg_queue/index start &
numprocs=1
directory= 你的當(dāng)前目錄
stderr_logfile_maxbytes=10MB
stdout_logfile=/var/log/supervisor/%(program_name)s-stdout.log
stderr_logfile=/var/log/supervisor/%(program_name)s-stderr.log
redirect_stderr=true
autostart=true
autorestart=true
  • 重啟supervisor讓配置文件生效
supervisorctl reload
  • 啟動(dòng)應(yīng)用進(jìn)程
supervisorctl start
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市桐绒,隨后出現(xiàn)的幾起案子夺脾,更是在濱河造成了極大的恐慌,老刑警劉巖茉继,帶你破解...
    沈念sama閱讀 207,113評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件咧叭,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡烁竭,警方通過(guò)查閱死者的電腦和手機(jī)菲茬,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評(píng)論 2 381
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人生均,你說(shuō)我怎么就攤上這事听想。” “怎么了马胧?”我有些...
    開(kāi)封第一講書(shū)人閱讀 153,340評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵汉买,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我佩脊,道長(zhǎng)蛙粘,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,449評(píng)論 1 279
  • 正文 為了忘掉前任威彰,我火速辦了婚禮出牧,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘歇盼。我一直安慰自己舔痕,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,445評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布豹缀。 她就那樣靜靜地躺著伯复,像睡著了一般。 火紅的嫁衣襯著肌膚如雪邢笙。 梳的紋絲不亂的頭發(fā)上啸如,一...
    開(kāi)封第一講書(shū)人閱讀 49,166評(píng)論 1 284
  • 那天,我揣著相機(jī)與錄音氮惯,去河邊找鬼叮雳。 笑死,一個(gè)胖子當(dāng)著我的面吹牛妇汗,可吹牛的內(nèi)容都是我干的帘不。 我是一名探鬼主播,決...
    沈念sama閱讀 38,442評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼杨箭,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼厌均!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起告唆,我...
    開(kāi)封第一講書(shū)人閱讀 37,105評(píng)論 0 261
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤棺弊,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后擒悬,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體模她,經(jīng)...
    沈念sama閱讀 43,601評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,066評(píng)論 2 325
  • 正文 我和宋清朗相戀三年懂牧,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了侈净。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片尊勿。...
    茶點(diǎn)故事閱讀 38,161評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖畜侦,靈堂內(nèi)的尸體忽然破棺而出元扔,到底是詐尸還是另有隱情,我是刑警寧澤旋膳,帶...
    沈念sama閱讀 33,792評(píng)論 4 323
  • 正文 年R本政府宣布澎语,位于F島的核電站,受9級(jí)特大地震影響验懊,放射性物質(zhì)發(fā)生泄漏擅羞。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,351評(píng)論 3 307
  • 文/蒙蒙 一义图、第九天 我趴在偏房一處隱蔽的房頂上張望减俏。 院中可真熱鬧,春花似錦碱工、人聲如沸娃承。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,352評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)草慧。三九已至,卻和暖如春匙头,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背仔雷。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,584評(píng)論 1 261
  • 我被黑心中介騙來(lái)泰國(guó)打工蹂析, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人碟婆。 一個(gè)月前我還...
    沈念sama閱讀 45,618評(píng)論 2 355
  • 正文 我出身青樓电抚,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親竖共。 傳聞我的和親對(duì)象是個(gè)殘疾皇子蝙叛,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,916評(píng)論 2 344

推薦閱讀更多精彩內(nèi)容