基于RabbitMQ和Swoole實(shí)現(xiàn)的一個(gè)完整的異步任務(wù)系統(tǒng)

從最開始的使用redis實(shí)現(xiàn)的單進(jìn)程消費(fèi)的異步任務(wù)系統(tǒng)到加入swoole的多進(jìn)程消費(fèi)模式寸宵,現(xiàn)在恤煞,我們的異步任務(wù)系統(tǒng)終于又能邁進(jìn)一步姓赤。

因?yàn)橛辛饲懊鎯蓚€(gè)簡(jiǎn)單系統(tǒng)的經(jīng)驗(yàn)刊苍,這回基于RabbitMQ的異步任務(wù)系統(tǒng)設(shè)計(jì)的的更加完善既们,包括多進(jìn)程消費(fèi),異常重試等正什。

系統(tǒng)介紹

消費(fèi)端架構(gòu)圖

從圖中可以看到啥纸,我們這個(gè)系統(tǒng)是一個(gè)基于事件的異步任務(wù)系統(tǒng)。就是說當(dāng)一個(gè)事件產(chǎn)生時(shí)婴氮,生產(chǎn)者將事件拋給調(diào)度器斯棒,調(diào)度器負(fù)責(zé)查詢事件下有哪些任務(wù),然后將這些任務(wù)丟到相應(yīng)的隊(duì)列中主经,最后由消費(fèi)者消費(fèi)任務(wù)隊(duì)列中的任務(wù)荣暮。

在整個(gè)系統(tǒng)中主要分為三大部分

1.事件生產(chǎn)者,即產(chǎn)生消息事件的一方旨怠。
2.任務(wù)調(diào)度器(Scheduler)渠驼,負(fù)責(zé)注冊(cè)事件并調(diào)度任務(wù)。
3.消費(fèi)者(Worker)鉴腻,負(fù)責(zé)消費(fèi)任務(wù)隊(duì)列中的任務(wù)迷扇。

事件生產(chǎn)者

事件生產(chǎn)者很簡(jiǎn)單百揭,在業(yè)務(wù)系統(tǒng)中直接調(diào)用即可,代碼如下蜓席。

<?php
require_once __DIR__.'/../autoload.php';
use Asynclib\Ebats\Event;
try{
    $event = new Event('order_paied');  //定義事件
    $event->setOptions(['order_id' => 'FB138020392193312']); //事件產(chǎn)生的參數(shù)
    $event->publish();
}catch (Exception $exc){
    echo $exc->getMessage();
}

任務(wù)調(diào)度器

調(diào)度器主要做兩件事器一,一是注冊(cè)事件,另一個(gè)是調(diào)度任務(wù)厨内。

注冊(cè)事件代碼如下:

//注冊(cè)事件
EventManager::register('order_create', 'closeOrder', 'demo', 10);//關(guān)閉未付款訂單(延遲任務(wù))
EventManager::register('order_paied', 'virtualShipping', 'demo'); //虛擬商品自動(dòng)發(fā)貨

這樣就注冊(cè)了兩個(gè)事件祈秕,事件下各有一個(gè)任務(wù)。

具體調(diào)度部分代碼很簡(jiǎn)單雏胃,就不多贅述请毛,有興趣的可以去看代碼。

消費(fèi)者

重頭戲來了瞭亮,一個(gè)異步任務(wù)系統(tǒng)最重要的就是消費(fèi)端了方仿,現(xiàn)在讓我們來看下Worker的流程圖。

一個(gè)完整的消費(fèi)進(jìn)程

可以看到统翩,在這里我們采用了兩個(gè)交換器和兩個(gè)隊(duì)列仙蚜,一個(gè)負(fù)責(zé)處理正常的任務(wù)即ntask,另一個(gè)負(fù)責(zé)處理需要延遲執(zhí)行的任務(wù)即dtask厂汗。簡(jiǎn)單描述下一個(gè)任務(wù)的生命周期委粉。

正常任務(wù)

1、task產(chǎn)生娶桦,進(jìn)入正常任務(wù)的交換器Exchange[ebats_core_ntask]
2贾节、交換器根據(jù)topic將任務(wù)分發(fā)到對(duì)應(yīng)的隊(duì)列中
3、子進(jìn)程ntask阻塞等待成功獲取到task趟紊,并執(zhí)行該任務(wù)
4氮双、執(zhí)行失敗,需要重試時(shí)拋出RetryException霎匈,不需要重試時(shí)拋出TaskException
5、子進(jìn)程ntask捕獲到重試異常將任務(wù)拋給延遲任務(wù)的交換器Exchange[ebats_core_dtask]
6送爸、將任務(wù)執(zhí)行信息回調(diào)給上層開發(fā)者以便保存查看

延遲任務(wù)

1铛嘱、子進(jìn)程dtask阻塞等待成功獲取到task,并執(zhí)行該任務(wù)
2袭厂、執(zhí)行失敗墨吓,需要重試時(shí)拋出RetryException,不需要重試時(shí)拋出TaskException
3纹磺、子進(jìn)程dtask捕獲到重試異常將任務(wù)拋給延遲任務(wù)的交換器Exchange[ebats_core_dtask]
4帖烘、將任務(wù)執(zhí)行信息回調(diào)給上層開發(fā)者以便保存查看

消費(fèi)者代碼如下:

require_once __DIR__.'/../autoload.php';
require_once __DIR__.'/task/TaskDemoModel.php';
use Asynclib\Ebats\Worker;

//執(zhí)行結(jié)果回調(diào)函數(shù)
$callback = function ($topic, $taskid, $taskname, $params, $timeuse, $message){

};
$worker = new Worker($callback);  //支持多進(jìn)程消費(fèi)默認(rèn)為1
$worker->setQueue('demo');  //隊(duì)列名和事件的topic一一對(duì)應(yīng)
$worker->run();

自定義調(diào)度器

一般來說這是一個(gè)基于事件的任務(wù)系統(tǒng),那么能不能直接產(chǎn)生任務(wù)呢橄杨。答案是肯定的秘症。

只需要?jiǎng)?chuàng)建一個(gè)自定義調(diào)度器照卦,由您自行實(shí)現(xiàn)調(diào)度邏輯,最終生成一個(gè)任務(wù)即可乡摹。代碼如下:

<?php
require_once __DIR__.'/../autoload.php';
use Asynclib\Ebats\Task;
use Asynclib\Core\Consumer;
use Asynclib\Amq\ExchangeTypes;
use Asynclib\Exception\ExceptionInterface;

/** 
 * 本示例演示了如何創(chuàng)建一個(gè)自定義調(diào)度器,開發(fā)者可以根據(jù)自身需求開發(fā)自己的任務(wù)調(diào)度器
 */
try{
    $worker = new Consumer();
    $worker->setExchange('order_fanout', ExchangeTypes::TOPIC);
    $worker->setQueue('shzf_order_paied', ['*.*.WAIT_SELLER_SEND_GOODS']);
    $worker->run(function($key, $msg){
        $order_data = json_encode($msg);
        echo " [$key] $order_data \n";
        Task::create('demo', 'orderAsync', $msg);//創(chuàng)建任務(wù),之后消息將作為參數(shù)由任務(wù)接管處理
    });
}catch (ExceptionInterface $exc){
    echo $exc->getMessage();
}

這樣役耕,當(dāng)接收到消息時(shí)就會(huì)產(chǎn)生一個(gè)orderAsync的任務(wù),您只需要啟動(dòng)一個(gè)用來消費(fèi)這個(gè)Topic的Worker即可聪廉。

也許你會(huì)覺得這里直接寫業(yè)務(wù)邏輯的代碼就可以了瞬痘,實(shí)際上也確實(shí)可以。當(dāng)你可以忍受一個(gè)進(jìn)程慢慢消費(fèi)的時(shí)候是可以這樣做的板熊。但大多數(shù)情況下我們還是希望它能夠盡快的消費(fèi)掉框全,所以建議這里只負(fù)責(zé)創(chuàng)建任務(wù),具體任務(wù)的業(yè)務(wù)邏輯由worker去執(zhí)行干签。

廣告

https://github.com/luojilab/async-task-lib
第一次開源竣况,未來的路還很長(zhǎng),請(qǐng)大家多多關(guān)照筒严。

我是閆大伯丹泉,一只剛剛走上開源之路的程序猿
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市鸭蛙,隨后出現(xiàn)的幾起案子摹恨,更是在濱河造成了極大的恐慌,老刑警劉巖娶视,帶你破解...
    沈念sama閱讀 206,968評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件晒哄,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡肪获,警方通過查閱死者的電腦和手機(jī)寝凌,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來孝赫,“玉大人较木,你說我怎么就攤上這事∏啾” “怎么了伐债?”我有些...
    開封第一講書人閱讀 153,220評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)致开。 經(jīng)常有香客問我峰锁,道長(zhǎng),這世上最難降的妖魔是什么双戳? 我笑而不...
    開封第一講書人閱讀 55,416評(píng)論 1 279
  • 正文 為了忘掉前任虹蒋,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘魄衅。我一直安慰自己峭竣,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評(píng)論 5 374
  • 文/花漫 我一把揭開白布徐绑。 她就那樣靜靜地躺著邪驮,像睡著了一般。 火紅的嫁衣襯著肌膚如雪傲茄。 梳的紋絲不亂的頭發(fā)上毅访,一...
    開封第一講書人閱讀 49,144評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音盘榨,去河邊找鬼喻粹。 笑死,一個(gè)胖子當(dāng)著我的面吹牛草巡,可吹牛的內(nèi)容都是我干的守呜。 我是一名探鬼主播,決...
    沈念sama閱讀 38,432評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼山憨,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼查乒!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起郁竟,我...
    開封第一講書人閱讀 37,088評(píng)論 0 261
  • 序言:老撾萬榮一對(duì)情侶失蹤玛迄,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后棚亩,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蓖议,經(jīng)...
    沈念sama閱讀 43,586評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評(píng)論 2 325
  • 正文 我和宋清朗相戀三年讥蟆,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了勒虾。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,137評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡瘸彤,死狀恐怖修然,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情钧栖,我是刑警寧澤低零,帶...
    沈念sama閱讀 33,783評(píng)論 4 324
  • 正文 年R本政府宣布,位于F島的核電站拯杠,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏啃奴。R本人自食惡果不足惜潭陪,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧依溯,春花似錦老厌、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至慷嗜,卻和暖如春淀弹,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背庆械。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評(píng)論 1 262
  • 我被黑心中介騙來泰國(guó)打工薇溃, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人缭乘。 一個(gè)月前我還...
    沈念sama閱讀 45,595評(píng)論 2 355
  • 正文 我出身青樓沐序,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親堕绩。 傳聞我的和親對(duì)象是個(gè)殘疾皇子策幼,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)奴紧,斷路器特姐,智...
    卡卡羅2017閱讀 134,601評(píng)論 18 139
  • 國(guó)家電網(wǎng)公司企業(yè)標(biāo)準(zhǔn)(Q/GDW)- 面向?qū)ο蟮挠秒娦畔?shù)據(jù)交換協(xié)議 - 報(bào)批稿:20170802 前言: 排版 ...
    庭說閱讀 10,869評(píng)論 6 13
  • 本文章是 Vert.x 藍(lán)圖系列 的第二篇教程。全系列: Vert.x Blueprint 系列教程(一) | 待...
    sczyh30閱讀 2,016評(píng)論 0 10
  • 尋不到歸宿 天青色幻想風(fēng)中起舞 墜入枯萎的心埠 殘陽映染了天廬 紅滿藕花深處 飛花空中旋成弧 婉轉(zhuǎn)無處落入 風(fēng)起云...
    知憶Lee閱讀 202評(píng)論 2 10
  • 在山腳下望 似乎天空就在山頂绰寞,觸手可及 到了山頂才發(fā)現(xiàn) 原來天空那樣遙不可及 就像你那么遠(yuǎn)那么近 那我寧愿永遠(yuǎn)呆在...
    你說我是丸子君閱讀 211評(píng)論 0 0