介紹:
Beanstalkd,一個(gè)高性能、輕量級(jí)的分布式內(nèi)存隊(duì)列系統(tǒng)叛买,最初設(shè)計(jì)的目的是想通過后臺(tái)異步執(zhí)行耗時(shí)的任務(wù)來降低高容量Web應(yīng)用系統(tǒng)的頁面訪問延遲,支持過有9.5 million用戶的Facebook Causes應(yīng)用到忽。后來開源,現(xiàn)在有PostRank大規(guī)模部署和使用清寇,每天處理百萬級(jí)任務(wù)喘漏。Beanstalkd是典型的類Memcached設(shè)計(jì),協(xié)議和使用方式都是同樣的風(fēng)格华烟,所以使用過memcached的用戶會(huì)覺得Beanstalkd似曾相識(shí)翩迈。
安裝
官網(wǎng)
https://kr.github.io/beanstalkd/
安裝
centos
yum install beanstalkd --enablerepo=epel
源碼:
tar -zxvf /usr/bin/beanstalkd/beanstalkd-1.10.tar.gz
cd beanstalkd
make install PERFIX=/usr/bin/beanstalkd
參考:http://kr.github.io/beanstalkd/download.html
啟動(dòng)
/usr/bin/beanstalkd -l 0.0.0.0 -p 11300 -b /var/lib/beanstalkd/binlog -F &
beanstalkd參數(shù):
/usr/bin/beanstalkd -h
Use: /usr/bin/beanstalkd [OPTIONS]
Options:
-b 開啟binlog,斷電后重啟會(huì)自動(dòng)恢復(fù)任務(wù)盔夜。
-f MS fsync最多每MS毫秒
-F從不fsync(默認(rèn))
-l ADDR偵聽地址(默認(rèn)為0.0.0.0)
-p端口偵聽端口(默認(rèn)為11300)
-u USER成為用戶和組
-z BYTES設(shè)置最大作業(yè)大懈核恰(以字節(jié)為單位)(默認(rèn)值為65535)
-s BYTES設(shè)置每個(gè)wal文件的大小(默認(rèn)為10485760) (將被舍入到512字節(jié)的倍數(shù))
-c壓縮binlog(默認(rèn))
- n 不要壓縮binlog
-v顯示版本信息
-V增加冗長(zhǎng)度
-h顯示這個(gè)幫助
4喂链、配置文件
/etc/sysconfig/beanstalkd
概念:
核心概念
- job:一個(gè)需要異步處理的任務(wù)返十,是 Beanstalkd 中的基本單元,需要放在一個(gè) tube 中椭微。
- tube:一個(gè)有名的任務(wù)隊(duì)列洞坑,用來存儲(chǔ)統(tǒng)一類型的 job,是 producer 和 consumer 操作的對(duì)象蝇率。
- producer:Job 的生產(chǎn)者迟杂,通過 put 命令來將一個(gè) job 放到一個(gè) tube 中。
- consumer:Job的消費(fèi)者本慕,通過 reserve/release/bury/delete 命令來獲取 job 或改變 job 的狀態(tài)排拷。
job 的生命周期
當(dāng)producer直接put一個(gè)job時(shí),job就處于READY狀態(tài)锅尘,等待consumer來處理监氢,如果選擇延遲put,job就先到DELAYED狀態(tài),等待時(shí)間過后才遷移到READY狀態(tài)忙菠。consumer獲取了當(dāng)前READY的job后何鸡,該job的狀態(tài)就遷移到RESERVED,這樣其他的consumer就不能再操作該job牛欢。當(dāng)consumer完成該job后,可以選擇delete, release或者bury操作淆游;delete之后,job從系統(tǒng)消亡犹菱,之后不能再獲取腊脱;release操作可以重新把該job狀態(tài)遷移回READY(也可以延遲該狀態(tài)遷移操作),使其他的consumer可以繼續(xù)獲取和執(zhí)行該job陕凹;有意思的是bury操作悍抑,可以把該job休眠,等到需要的時(shí)候搜骡,再將休眠的job kick回READY狀態(tài)佑女,也可以delete BURIED狀態(tài)的job记靡。正是有這些有趣的操作和狀態(tài),才可以基于此做出很多意思的應(yīng)用团驱,比如要實(shí)現(xiàn)一個(gè)循環(huán)隊(duì)列摸吠,就可以將RESERVED狀態(tài)的job休眠掉嚎花,等沒有READY狀態(tài)的job時(shí)再將BURIED狀態(tài)的job一次性kick回READY狀態(tài)。
- READY - 需要立即處理的任務(wù)贩幻,當(dāng)延時(shí) (DELAYED) 任務(wù)到期后會(huì)自動(dòng)成為當(dāng)前任務(wù);
- DELAYED - 延遲執(zhí)行的任務(wù), 當(dāng)消費(fèi)者處理任務(wù)后, 可以用將消息再次放回 DELAYED 隊(duì)列延遲執(zhí)行族壳;
- RESERVED - 已經(jīng)被消費(fèi)者獲取, 正在執(zhí)行的任務(wù)趣些。Beanstalkd 負(fù)責(zé)檢查任務(wù)是否在 TTR(time-to-run) 內(nèi)完成;
- BURIED - 保留的任務(wù): 任務(wù)不會(huì)被執(zhí)行,也不會(huì)消失锦亦,除非有人把它 "踢" 回隊(duì)列令境;
- DELETED - 消息被徹底刪除。Beanstalkd 不再維持這些消息抛蚁。
一些特性
優(yōu)先級(jí)
任務(wù) (job) 可以有 0~2^32 個(gè)優(yōu)先級(jí), 0 代表最高優(yōu)先級(jí)惕橙,默認(rèn)優(yōu)先級(jí)為1024。
持久化
可以通過binlog將job及其狀態(tài)記錄到文件里面弥鹦,在Beanstalkd下次啟動(dòng)時(shí)可以通過讀取binlog來恢復(fù)之前的job及狀態(tài)。
分布式容錯(cuò)
分布式設(shè)計(jì)和Memcached類似朦促,beanstalkd各個(gè)server之間并不知道彼此的存在苍鲜,都是通過client來實(shí)現(xiàn)分布式以及根據(jù)tube名稱去特定server獲取job。
超時(shí)控制
為了防止某個(gè)consumer長(zhǎng)時(shí)間占用任務(wù)但不能處理的情況洒疚,Beanstalkd為reserve操作設(shè)置了timeout時(shí)間坯屿,如果該consumer不能在指定時(shí)間內(nèi)完成job,job將被遷移回READY狀態(tài)领跛,供其他consumer執(zhí)行。
客戶端操作
項(xiàng)目地址:https://github.com/pda/pheanstalk/
1吠昭、向隊(duì)列中添加job
<?php
//創(chuàng)建隊(duì)列消息
require_once('./vendor/autoload.php');
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk('127.0.0.1',11300);
$tubeName='user_eamil_message_list';
$jobData=array(
'uid' => time(),
'email' => 'wukong@qq.com',
'message' => 'Hello World !!',
'dtime' => date('Y-m-d H:i:s'),
);
$pheanstalk ->useTube( $tubeName) ->put( json_encode( $jobData));
echo json_encode($jobData).PHP_EOL;
echo 'Success ~~'.PHP_EOL;
2矢棚、從隊(duì)列中取出job
<?php
//消費(fèi)隊(duì)列消息
require_once('./vendor/autoload.php');
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk('127.0.0.1',11300);
$tubeName='user_eamil_message_list';
while(true){
//獲取隊(duì)列信息,reserve 阻塞獲取
$job = $pheanstalk ->watch($tubeName) ->ignore('default') ->reserve();
$data=$job->getData();
//執(zhí)行相關(guān)邏輯代碼
$ret = file_put_contents('./send_mail.log',$data,FILE_APPEND | LOCK_EX);
if( $ret ){
echo '執(zhí)行成功'.PHP_EOL.$data.PHP_EOL;
$pheanstalk->delete($job);
}
//暫停(不可能是百分百的準(zhǔn)確,跟系統(tǒng)的調(diào)度蘑拯、CPU時(shí)鐘周期等有一定關(guān)系)
usleep(500000);
}
echo 'Success ~~'.PHP_EOL;
3、檢查服務(wù)狀態(tài)
<?php
//監(jiān)控服務(wù)狀態(tài)
require_once('./vendor/autoload.php');
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk('127.0.0.1',11300);
$isAlive = $pheanstalk->getConnection()->isServiceListening();
var_dump($isAlive);
/**
可以開發(fā)監(jiān)控面板弯蚜,監(jiān)控?cái)?shù)據(jù)的剃法,有多少tube,多少隊(duì)列贷洲,多少延遲等等
//查看beanstalkd狀態(tài)
//var_dump($pheanstalk->stats());
//查看有多少個(gè)tube
//var_dump($pheanstalk->listTubes());
//設(shè)置要監(jiān)聽的tube
$pheanstalk->watch('test');
//取消對(duì)默認(rèn)tube的監(jiān)聽,可以省略
$pheanstalk->ignore('default');
//查看監(jiān)聽的tube列表
//var_dump($pheanstalk->listTubesWatched());
//查看test的tube當(dāng)前的狀態(tài)
//var_dump($pheanstalk->statsTube('test'));
*/
測(cè)試:
生產(chǎn)消息:
$ php producer.php
{"uid":1499569740,"email":"wukong@qq.com","message":"Hello World !!","dtime":"2017-07-09 11:09:00"}
Success ~~
php producer.php
{"uid":1499569742,"email":"wukong@qq.com","message":"Hello World !!","dtime":"2017-07-09 11:09:02"}
Success ~~
php producer.php
{"uid":1499569744,"email":"wukong@qq.com","message":"Hello World !!","dtime":"2017-07-09 11:09:04"}
Success ~~
消費(fèi)消息:
$ php consumer.php
執(zhí)行成功
{"uid":1499569740,"email":"wukong@qq.com","message":"Hello World !!","dtime":"2017-07-09 11:09:00"}
執(zhí)行成功
{"uid":1499569742,"email":"wukong@qq.com","message":"Hello World !!","dtime":"2017-07-09 11:09:02"}
執(zhí)行成功
{"uid":1499569744,"email":"wukong@qq.com","message":"Hello World !!","dtime":"2017-07-09 11:09:04"}
PHP監(jiān)控
https://github.com/kr/beanstalkd/wiki/Tools
https://github.com/ptrofimov/beanstalk_console
https://github.com/jimbojsb/bstools
參考:
https://segmentfault.com/a/1190000002784775
http://www.reibang.com/p/413676e1696f