一、使用celery的原因
分布式任務(wù)調(diào)度框架celery及其監(jiān)控工具flower催蝗,Linux進(jìn)程管理工具supervisor
項(xiàng)目痛點(diǎn):
1切威、代碼上線及運(yùn)維困難,新代碼上線必須保證系統(tǒng)中沒有正在運(yùn)行的異步任務(wù)丙号,等待任務(wù)結(jié)束期間無法保證系統(tǒng)不在接收新任務(wù)先朦。(項(xiàng)目中進(jìn)程多是以multiprocessing方式啟動)
2、重啟困難犬缨,重啟后不知道是否啟動成功喳魏,必須手動curl測試接口保證系統(tǒng)重啟成功,缺少重啟監(jiān)控機(jī)制怀薛。
痛點(diǎn)解決:
1刺彩、celery解決中斷任務(wù)痛點(diǎn),所有異步任務(wù)均由celery下發(fā)枝恋〈淳螅可單獨(dú)重啟一個(gè)worker或所有worker。重啟worker時(shí)保證當(dāng)前worker正在消費(fèi)的任務(wù)重新回到隊(duì)列焚碌,等待處于工作狀態(tài)的worker消費(fèi)畦攘。不同worker可運(yùn)行不同版本的代碼。
2呐能、supervisor解決重啟痛點(diǎn)念搬,新架構(gòu)中一個(gè)節(jié)點(diǎn)會啟多worker以及flower和后端服務(wù),具有大量進(jìn)程需要管理摆出,手動管理已然不現(xiàn)實(shí)朗徊。supervisor可對啟動異常的進(jìn)程自動重啟也可對異常退出的進(jìn)程進(jìn)行拉起,并且提供客戶端和web界面偎漫。
二爷恳、架構(gòu)圖
三、調(diào)度框架celery
celery中的幾個(gè)概念
1象踊、broker 消息傳輸中間件温亲,可以簡單理解為隊(duì)列,支持RabbitMQ杯矩,Redis栈虚,SQS(某些博客說支持sqlalchemy,官網(wǎng)未找到史隆,實(shí)驗(yàn)也未成功)魂务。celery對Redis Cluster類型的redis集群支持不是很好,目前正在尋找解決方案泌射。
2粘姜、exchange 路由取胎,可將特定任務(wù)路由到指定隊(duì)列霎迫。
3、worker 消費(fèi)者吃既。會在多節(jié)點(diǎn)啟多worker
4拒秘、task 異步任務(wù)号显。某些任務(wù)需要指定消費(fèi)節(jié)點(diǎn)。所以觸發(fā)任務(wù)時(shí)需要顯式指定該任務(wù)的存放的隊(duì)列翼抠,task.apply_async(queue='q1')咙轩。未指定的將會放到default隊(duì)列,由三個(gè)節(jié)點(diǎn)競爭阴颖。
5活喊、backend 結(jié)果存儲×坷ⅲ可使用mq钾菊,redis,nosql偎肃、mysql等煞烫。存放任務(wù)執(zhí)行的結(jié)果。
1累颂、使用方案
一滞详、異步任務(wù)
設(shè)置default凛俱,q1,q2料饥,q3四個(gè)隊(duì)列蒲犬,各節(jié)點(diǎn)會監(jiān)聽各自的隊(duì)列,并且所有節(jié)點(diǎn)都監(jiān)聽default隊(duì)列岸啡。
編寫異步任務(wù)和正常寫函數(shù)是一樣的原叮,最后只需要對該函數(shù)使用裝飾器@celery.task將該任務(wù)注冊為異步任務(wù)。如果有多個(gè)裝飾器進(jìn)行組合使用時(shí)巡蘸,必須確保 task()
裝飾器被放置在首位:
@app.task
@decorator2
@decorator1
def add(x, y):
return x + y
觸發(fā)任務(wù)
簡單觸發(fā)時(shí)可使用 delay
奋隶,但是該方法無法指定存放的隊(duì)列,因此該任務(wù)會被放到默認(rèn)隊(duì)列
task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
如果需要設(shè)置額外的行參數(shù)悦荒,必須用 apply_async
task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x'}, queue='q1')
啟動worker
celery -A app.celery worker -l info -Q default,q1 --concurrency=10 -n node1-worker-1@%%h
二唯欣、beat任務(wù)(定時(shí)任務(wù))
對于celery產(chǎn)生的定時(shí)任務(wù)如果放到一個(gè)隊(duì)列里,該任務(wù)被一個(gè)worker拿到后其他worker將獲取不到該任務(wù)搬味。這樣會產(chǎn)生一個(gè)現(xiàn)象即該任務(wù)只在一個(gè)節(jié)點(diǎn)執(zhí)行了黍聂,但業(yè)務(wù)上需要的是該任務(wù)在各個(gè)節(jié)點(diǎn)都執(zhí)行。
對此現(xiàn)象的解決方案是各節(jié)點(diǎn)需要定制各節(jié)點(diǎn)的定時(shí)任務(wù)并放到各自的隊(duì)列里身腻。對于任務(wù)a生成者會將其產(chǎn)生三份對下發(fā)到三個(gè)節(jié)點(diǎn)产还。對于任務(wù)b,node1并不需要執(zhí)行嘀趟,因此會產(chǎn)生兩份并將其下發(fā)到node2和node3上脐区。
這并不意味同一個(gè)任務(wù)需要編寫三份,任務(wù)編寫完后只需要將其注冊各節(jié)點(diǎn)對應(yīng)的配置里(需要自己實(shí)現(xiàn))她按。
為不影響其他異步任務(wù)執(zhí)行牛隅,beat將會由各節(jié)點(diǎn)單獨(dú)的worker進(jìn)行消費(fèi)。
產(chǎn)生beat任務(wù)
celery -A app.celery beat -l info
消費(fèi)beat任務(wù)
celery -A app.celery worker -l info -Q node1-crontab --concurrency=10 -n node1-worker-crontab@%%h
這里的node1-crontab為新的隊(duì)列酌泰,專門存放node1節(jié)點(diǎn)需要消費(fèi)的定時(shí)任務(wù)媒佣。
三、task是如何工作的
這里會說明為什么不同worker可以運(yùn)行不同版本的代碼陵刹,甚至生產(chǎn)者和消費(fèi)者之間也可以運(yùn)行不同版本的代碼默伍。
celery的任務(wù)是注冊在注冊表中,該表中注冊了任務(wù)名和任務(wù)類衰琐。說人話就是celery會在隊(duì)列中傳遞任務(wù)的模塊也糊,例如proj模塊中有一個(gè)task.py,該文件中編寫了一個(gè)叫add的異步任務(wù)(函數(shù))羡宙,那么celery傳遞的就是proj.task.add狸剃,只要保證消費(fèi)該任務(wù)的worker中有該模塊該文件該函數(shù)就行,worker并不關(guān)心該函數(shù)里是怎樣執(zhí)行的狗热,是否和生產(chǎn)者一致钞馁。
任務(wù)狀態(tài)
- PENDING 任務(wù)正在等待執(zhí)行或未知虑省。任何未知的任務(wù) ID 都默認(rèn)處于掛起狀態(tài)。
- STARTED 任務(wù)開始執(zhí)行
- SUCCESS 任務(wù)執(zhí)行成功
- FAILURE 任務(wù)執(zhí)行失敗
- RETRY 任務(wù)處于重試狀態(tài)僧凰,這里指在task中捕獲到異常并顯式調(diào)用celery使其重試
- REVOKED 任務(wù)被撤銷
@celery.task(bind=True)
def send_twitter_status(self, oauth, tweet):
try:
twitter = Twitter(oauth)
twitter.update_status(tweet)
except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
raise self.retry(exc=exc)
2慷妙、celery監(jiān)控工具flower
Flower是基于web的監(jiān)控和管理celery工具
flower可以
- 用Celery事件實(shí)時(shí)監(jiān)控,顯示任務(wù)的詳細(xì)信息允悦,圖形化和統(tǒng)計(jì)
- 查看worker狀態(tài)和統(tǒng)計(jì),查看當(dāng)前正在運(yùn)行的tasks
- Broker monitoring(中間人監(jiān)控)虑啤,查看所有Celery 隊(duì)列的統(tǒng)計(jì)隙弛,隊(duì)列長度圖
flower只需啟動在生產(chǎn)者端即可
截圖展示
四、進(jìn)程管理工具supervisor
粗略估計(jì)在node1上會啟后端服務(wù)狞山,celery worker三個(gè)全闷,定時(shí)任務(wù)消費(fèi)worker一個(gè),celery beat一個(gè)萍启,flower進(jìn)程总珠。這么多進(jìn)程用手工一個(gè)個(gè)啟動肯定要花費(fèi)大量時(shí)間,于是用supervisor管理這些進(jìn)程勘纯。
supervisor會已啟動自己子進(jìn)程的方式開啟進(jìn)程局服,可以對異常退出的進(jìn)程進(jìn)行重啟操作。
supervisor可以分為三個(gè)部分
- supervisord 服務(wù)端驳遵,主要負(fù)責(zé)啟動與管理進(jìn)程淫奔,響應(yīng)客戶端的請求
- supervisorctl 客戶端,提供一個(gè)命令行來使用supervisord提供的服務(wù)
- web界面 用來查看與管理子進(jìn)程
1堤结、子進(jìn)程配置
[program:worker]
command=celery -A app.celery worker -l info -Q default,q1 --concurrency=10 -n node1-worker-%(process_num)s@%%h ; 啟動命令
process_name=%(program_name)s-%(process_num)d ; 進(jìn)程名
numprocs=3 ; 進(jìn)程數(shù)量
directory=/Users/aaa/PycharmProjects/flask_test ; 工作路徑
;umask=022 ; umask for process (default None)
priority=999 ; 優(yōu)先級唆迁。優(yōu)先級低,最先啟動竞穷,關(guān)閉的時(shí)候最后關(guān)閉
autostart=true ; supervisor啟動后自動啟動
startsecs=1 ; 啟動多少秒后是running認(rèn)為啟動成功
;startretries=3 ; 最大啟動重試次數(shù) (default 3)
autorestart=true ; 子進(jìn)程掛掉自動重啟 (def: unexpected)
;exitcodes=0 ; 'expected' exit codes used with autorestart (default 0)
stopsignal=TERM ; 進(jìn)程停止信號唐责,停止celery worker時(shí)使用TERM, (TERM, HUP, INT, QUIT, KILL, USR1, or USR2)
stopwaitsecs=30 ; 等待停止最大時(shí)間,超過此時(shí)間會強(qiáng)制kill (default 10)
stopasgroup=true ; 停掉子進(jìn)程的子進(jìn)程(保證不會出現(xiàn)孤兒進(jìn)程)
;killasgroup=true ; kill進(jìn)程及其子進(jìn)程瘾带,直接發(fā)送KILL信號不會等待進(jìn)程退出
;user=chrism ; 管理子進(jìn)程的用戶
redirect_stderr=true ; redirect 日志 stderr to stdout
stdout_logfile=/Users/aaa/PycharmProjects/flask_test/log/node1/celery-worker-1.log ; 日志
stdout_logfile_maxbytes=50MB ; 單個(gè)日志文件最大大小 (default 50MB)
stdout_logfile_backups=20 ; 日志文件數(shù)量 (default 10)
;stdout_capture_maxbytes=1MB ; number of bytes in 'capturemode' (default 0)
;stdout_events_enabled=false ; emit events on stdout writes (default false)
;stdout_syslog=false ; send stdout to syslog with process name (default false)
;stderr_logfile=/Users/aaa/PycharmProjects/flask_test/log/default/celery_err.log ; 錯(cuò)誤日志
;stderr_logfile_maxbytes=10MB ; max # logfile bytes b4 rotation (default 50MB)
;stderr_logfile_backups=10 ; # of stderr logfile backups (0 means none, default 10)
;stderr_capture_maxbytes=1MB ; number of bytes in 'capturemode' (default 0)
;stderr_events_enabled=false ; emit events on stderpidr writes (default false)
;stderr_syslog=false ; send stderr to syslog with process name (default false)
environment=PATH="/Users/aaa/anaconda3/envs/flask_test/bin" ; 環(huán)境變量鼠哥,子進(jìn)程間不共享
;serverurl=AUTO ; override serverurl computation (childutils)
supervisor配置文件放在supervisord.conf中
啟動supervisord
supervisord -c supervisord.conf
對于celery worker節(jié)點(diǎn)進(jìn)程退出信號使用TERM,TERM信號會使worker進(jìn)行熱關(guān)機(jī)看政,worker會將未消費(fèi)完的任務(wù)放回到隊(duì)列肴盏。
發(fā)現(xiàn)丟任務(wù)的情況:
假設(shè)worker1正在消費(fèi)任務(wù)3個(gè),worker2正在消費(fèi)任務(wù)4個(gè)帽衙。將worker1關(guān)機(jī)菜皂,3個(gè)任務(wù)會進(jìn)入到worker2,再將work2關(guān)機(jī)后打開worker3厉萝,這時(shí)會發(fā)現(xiàn)少了兩個(gè)任務(wù)恍飘。
丟任務(wù)的解決方案:
方案一:啟動一個(gè)worker然后將其關(guān)機(jī)后未消費(fèi)完的任務(wù)可以全部回到隊(duì)列榨崩,需要重啟時(shí)可以先將未有消費(fèi)任務(wù)的worker進(jìn)行重啟,然后再停掉正在消費(fèi)的worker章母∧钢耄或者只停掉一個(gè)worker。
方案二:supervisor進(jìn)程組的概念乳怎,直接將進(jìn)程組重啟
代碼更新后只需重啟子進(jìn)程彩郊,不需要重啟supervisord
進(jìn)程組配置
[group:celery-worker]
programs=worker ;上面實(shí)例三個(gè)進(jìn)程會默認(rèn)分配到名為worker的進(jìn)程組,這里定義進(jìn)程組會覆蓋默認(rèn)的
priority=999 ; the relative start priority (default 999)
對進(jìn)程組進(jìn)行操作等同于對進(jìn)程組下所有的進(jìn)程操作
對于進(jìn)程組操作在進(jìn)程組名后需要加上冒號即 celery-worker:
2蚪缀、進(jìn)程數(shù)說明
問題:celery啟動命令中已經(jīng)指定了 --concurrency=10
參數(shù)配置worker中開啟的進(jìn)程數(shù)量秫逝,為什么在supervisord中還要指定 numprocs=3
進(jìn)程數(shù)呢?
答:這兩個(gè)參數(shù)指定的進(jìn)程數(shù)量是不同的意義询枚。在celery中指定進(jìn)程數(shù)即意味著單個(gè)worker中可開啟的最大進(jìn)程數(shù)據(jù)量违帆。在supervisord指定的進(jìn)程數(shù)會直接開啟三個(gè)worker,相當(dāng)將定義的cmd執(zhí)行了三次金蜀。
supervisord中如果指定numprocs
的同時(shí)也需要指定 process_name=%(program_name)s-%(process_num)d
刷后,原因在于如果多個(gè)進(jìn)程使用相同的進(jìn)程名會報(bào)錯(cuò),所以需要指定不同的進(jìn)程名渊抄。program_name
為 [program:worker]
中定義的名字尝胆,即worker。process_num
為進(jìn)程的序號护桦,從1開始班巩,注意它不是pid。
使用numprocs=3
創(chuàng)建的三個(gè)worker默認(rèn)會被放到一個(gè)名為worker(在哪里定義)的進(jìn)程組里嘶炭,如果在后面定義一個(gè)新的進(jìn)程組并將worker放進(jìn)去則這三個(gè)worker會的默認(rèn)進(jìn)程組會被替換為新的進(jìn)程組抱慌,同時(shí)新的進(jìn)程組里也可以放一個(gè)在其它program里定義的進(jìn)程。
[group:node1-celery-worker]
programs=worker,crontab-worker # crontab-worker為在其它program里定義的進(jìn)程
priority=999
3眨猎、supervisorctl命令
supervisorctl start ${program} # 啟動進(jìn)程
supervisorctl stop ${program} # 停止進(jìn)程
supervisorctl restart ${program} # 重啟進(jìn)程
supervisorctl status ${program} # 查看進(jìn)程狀態(tài)
supervisorctl update # 重新載入配置文件
supervisorctl shutdown # 關(guān)閉supervisord服務(wù)
supervisorctl reload # 重啟supervisord服務(wù)
supervisorctl stop all # 停止所有進(jìn)程
對于已經(jīng)配置好的supervisor并不需要進(jìn)行supervisord級別的重啟以及重新載入配置抑进。代碼更新后只需重啟子進(jìn)程即可加載最新代碼。
4睡陪、supervisor界面
將supervisorctl的命令可視化寺渗,可以直接點(diǎn)點(diǎn)點(diǎn),此外還可以展示子進(jìn)程的日志兰迫。實(shí)則感覺有supervisorctl就可以了