從最開始的使用redis實(shí)現(xiàn)的單進(jìn)程消費(fèi)的異步任務(wù)系統(tǒng)到加入swoole的多進(jìn)程消費(fèi)模式寸宵,現(xiàn)在恤煞,我們的異步任務(wù)系統(tǒng)終于又能邁進(jìn)一步姓赤。
因?yàn)橛辛饲懊鎯蓚€(gè)簡(jiǎn)單系統(tǒng)的經(jīng)驗(yàn)刊苍,這回基于RabbitMQ的異步任務(wù)系統(tǒng)設(shè)計(jì)的的更加完善既们,包括多進(jìn)程消費(fèi),異常重試等正什。
系統(tǒng)介紹
從圖中可以看到啥纸,我們這個(gè)系統(tǒng)是一個(gè)基于事件的異步任務(wù)系統(tǒng)。就是說當(dāng)一個(gè)事件產(chǎn)生時(shí)婴氮,生產(chǎn)者將事件拋給調(diào)度器斯棒,調(diào)度器負(fù)責(zé)查詢事件下有哪些任務(wù),然后將這些任務(wù)丟到相應(yīng)的隊(duì)列中主经,最后由消費(fèi)者消費(fèi)任務(wù)隊(duì)列中的任務(wù)荣暮。
在整個(gè)系統(tǒng)中主要分為三大部分
1.事件生產(chǎn)者,即產(chǎn)生消息事件的一方旨怠。
2.任務(wù)調(diào)度器(Scheduler)渠驼,負(fù)責(zé)注冊(cè)事件并調(diào)度任務(wù)。
3.消費(fèi)者(Worker)鉴腻,負(fù)責(zé)消費(fèi)任務(wù)隊(duì)列中的任務(wù)迷扇。
事件生產(chǎn)者
事件生產(chǎn)者很簡(jiǎn)單百揭,在業(yè)務(wù)系統(tǒng)中直接調(diào)用即可,代碼如下蜓席。
<?php
require_once __DIR__.'/../autoload.php';
use Asynclib\Ebats\Event;
try{
$event = new Event('order_paied'); //定義事件
$event->setOptions(['order_id' => 'FB138020392193312']); //事件產(chǎn)生的參數(shù)
$event->publish();
}catch (Exception $exc){
echo $exc->getMessage();
}
任務(wù)調(diào)度器
調(diào)度器主要做兩件事器一,一是注冊(cè)事件,另一個(gè)是調(diào)度任務(wù)厨内。
注冊(cè)事件代碼如下:
//注冊(cè)事件
EventManager::register('order_create', 'closeOrder', 'demo', 10);//關(guān)閉未付款訂單(延遲任務(wù))
EventManager::register('order_paied', 'virtualShipping', 'demo'); //虛擬商品自動(dòng)發(fā)貨
這樣就注冊(cè)了兩個(gè)事件祈秕,事件下各有一個(gè)任務(wù)。
具體調(diào)度部分代碼很簡(jiǎn)單雏胃,就不多贅述请毛,有興趣的可以去看代碼。
消費(fèi)者
重頭戲來了瞭亮,一個(gè)異步任務(wù)系統(tǒng)最重要的就是消費(fèi)端了方仿,現(xiàn)在讓我們來看下Worker的流程圖。
可以看到统翩,在這里我們采用了兩個(gè)交換器和兩個(gè)隊(duì)列仙蚜,一個(gè)負(fù)責(zé)處理正常的任務(wù)即ntask,另一個(gè)負(fù)責(zé)處理需要延遲執(zhí)行的任務(wù)即dtask厂汗。簡(jiǎn)單描述下一個(gè)任務(wù)的生命周期委粉。
正常任務(wù)
1、task產(chǎn)生娶桦,進(jìn)入正常任務(wù)的交換器Exchange[ebats_core_ntask]
2贾节、交換器根據(jù)topic將任務(wù)分發(fā)到對(duì)應(yīng)的隊(duì)列中
3、子進(jìn)程ntask阻塞等待成功獲取到task趟紊,并執(zhí)行該任務(wù)
4氮双、執(zhí)行失敗,需要重試時(shí)拋出RetryException霎匈,不需要重試時(shí)拋出TaskException
5、子進(jìn)程ntask捕獲到重試異常將任務(wù)拋給延遲任務(wù)的交換器Exchange[ebats_core_dtask]
6送爸、將任務(wù)執(zhí)行信息回調(diào)給上層開發(fā)者以便保存查看
延遲任務(wù)
1铛嘱、子進(jìn)程dtask阻塞等待成功獲取到task,并執(zhí)行該任務(wù)
2袭厂、執(zhí)行失敗墨吓,需要重試時(shí)拋出RetryException,不需要重試時(shí)拋出TaskException
3纹磺、子進(jìn)程dtask捕獲到重試異常將任務(wù)拋給延遲任務(wù)的交換器Exchange[ebats_core_dtask]
4帖烘、將任務(wù)執(zhí)行信息回調(diào)給上層開發(fā)者以便保存查看
消費(fèi)者代碼如下:
require_once __DIR__.'/../autoload.php';
require_once __DIR__.'/task/TaskDemoModel.php';
use Asynclib\Ebats\Worker;
//執(zhí)行結(jié)果回調(diào)函數(shù)
$callback = function ($topic, $taskid, $taskname, $params, $timeuse, $message){
};
$worker = new Worker($callback); //支持多進(jìn)程消費(fèi)默認(rèn)為1
$worker->setQueue('demo'); //隊(duì)列名和事件的topic一一對(duì)應(yīng)
$worker->run();
自定義調(diào)度器
一般來說這是一個(gè)基于事件的任務(wù)系統(tǒng),那么能不能直接產(chǎn)生任務(wù)呢橄杨。答案是肯定的秘症。
只需要?jiǎng)?chuàng)建一個(gè)自定義調(diào)度器照卦,由您自行實(shí)現(xiàn)調(diào)度邏輯,最終生成一個(gè)任務(wù)即可乡摹。代碼如下:
<?php
require_once __DIR__.'/../autoload.php';
use Asynclib\Ebats\Task;
use Asynclib\Core\Consumer;
use Asynclib\Amq\ExchangeTypes;
use Asynclib\Exception\ExceptionInterface;
/**
* 本示例演示了如何創(chuàng)建一個(gè)自定義調(diào)度器,開發(fā)者可以根據(jù)自身需求開發(fā)自己的任務(wù)調(diào)度器
*/
try{
$worker = new Consumer();
$worker->setExchange('order_fanout', ExchangeTypes::TOPIC);
$worker->setQueue('shzf_order_paied', ['*.*.WAIT_SELLER_SEND_GOODS']);
$worker->run(function($key, $msg){
$order_data = json_encode($msg);
echo " [$key] $order_data \n";
Task::create('demo', 'orderAsync', $msg);//創(chuàng)建任務(wù),之后消息將作為參數(shù)由任務(wù)接管處理
});
}catch (ExceptionInterface $exc){
echo $exc->getMessage();
}
這樣役耕,當(dāng)接收到消息時(shí)就會(huì)產(chǎn)生一個(gè)orderAsync的任務(wù),您只需要啟動(dòng)一個(gè)用來消費(fèi)這個(gè)Topic的Worker即可聪廉。
也許你會(huì)覺得這里直接寫業(yè)務(wù)邏輯的代碼就可以了瞬痘,實(shí)際上也確實(shí)可以。當(dāng)你可以忍受一個(gè)進(jìn)程慢慢消費(fèi)的時(shí)候是可以這樣做的板熊。但大多數(shù)情況下我們還是希望它能夠盡快的消費(fèi)掉框全,所以建議這里只負(fù)責(zé)創(chuàng)建任務(wù),具體任務(wù)的業(yè)務(wù)邏輯由worker去執(zhí)行干签。
廣告
https://github.com/luojilab/async-task-lib
第一次開源竣况,未來的路還很長(zhǎng),請(qǐng)大家多多關(guān)照筒严。