分布式任務(wù)調(diào)度celery與進(jìn)程管理supervisord

一、使用celery的原因

分布式任務(wù)調(diào)度框架celery及其監(jiān)控工具flower催蝗,Linux進(jìn)程管理工具supervisor

項(xiàng)目痛點(diǎn):
1切威、代碼上線及運(yùn)維困難,新代碼上線必須保證系統(tǒng)中沒有正在運(yùn)行的異步任務(wù)丙号,等待任務(wù)結(jié)束期間無法保證系統(tǒng)不在接收新任務(wù)先朦。(項(xiàng)目中進(jìn)程多是以multiprocessing方式啟動)
2、重啟困難犬缨,重啟后不知道是否啟動成功喳魏,必須手動curl測試接口保證系統(tǒng)重啟成功,缺少重啟監(jiān)控機(jī)制怀薛。

痛點(diǎn)解決:
1刺彩、celery解決中斷任務(wù)痛點(diǎn),所有異步任務(wù)均由celery下發(fā)枝恋〈淳螅可單獨(dú)重啟一個(gè)worker或所有worker。重啟worker時(shí)保證當(dāng)前worker正在消費(fèi)的任務(wù)重新回到隊(duì)列焚碌,等待處于工作狀態(tài)的worker消費(fèi)畦攘。不同worker可運(yùn)行不同版本的代碼。
2呐能、supervisor解決重啟痛點(diǎn)念搬,新架構(gòu)中一個(gè)節(jié)點(diǎn)會啟多worker以及flower和后端服務(wù),具有大量進(jìn)程需要管理摆出,手動管理已然不現(xiàn)實(shí)朗徊。supervisor可對啟動異常的進(jìn)程自動重啟也可對異常退出的進(jìn)程進(jìn)行拉起,并且提供客戶端和web界面偎漫。

二爷恳、架構(gòu)圖

celery 架構(gòu).png

三、調(diào)度框架celery

celery中的幾個(gè)概念

1象踊、broker 消息傳輸中間件温亲,可以簡單理解為隊(duì)列,支持RabbitMQ杯矩,Redis栈虚,SQS(某些博客說支持sqlalchemy,官網(wǎng)未找到史隆,實(shí)驗(yàn)也未成功)魂务。celery對Redis Cluster類型的redis集群支持不是很好,目前正在尋找解決方案泌射。
2粘姜、exchange 路由取胎,可將特定任務(wù)路由到指定隊(duì)列霎迫。
3、worker 消費(fèi)者吃既。會在多節(jié)點(diǎn)啟多worker
4拒秘、task 異步任務(wù)号显。某些任務(wù)需要指定消費(fèi)節(jié)點(diǎn)。所以觸發(fā)任務(wù)時(shí)需要顯式指定該任務(wù)的存放的隊(duì)列翼抠,task.apply_async(queue='q1')咙轩。未指定的將會放到default隊(duì)列,由三個(gè)節(jié)點(diǎn)競爭阴颖。
5活喊、backend 結(jié)果存儲×坷ⅲ可使用mq钾菊,redis,nosql偎肃、mysql等煞烫。存放任務(wù)執(zhí)行的結(jié)果。

1累颂、使用方案

一滞详、異步任務(wù)

設(shè)置default凛俱,q1,q2料饥,q3四個(gè)隊(duì)列蒲犬,各節(jié)點(diǎn)會監(jiān)聽各自的隊(duì)列,并且所有節(jié)點(diǎn)都監(jiān)聽default隊(duì)列岸啡。
編寫異步任務(wù)和正常寫函數(shù)是一樣的原叮,最后只需要對該函數(shù)使用裝飾器@celery.task將該任務(wù)注冊為異步任務(wù)。如果有多個(gè)裝飾器進(jìn)行組合使用時(shí)巡蘸,必須確保 task() 裝飾器被放置在首位:

@app.task
@decorator2
@decorator1
def add(x, y):
    return x + y

觸發(fā)任務(wù)
簡單觸發(fā)時(shí)可使用 delay 奋隶,但是該方法無法指定存放的隊(duì)列,因此該任務(wù)會被放到默認(rèn)隊(duì)列

task.delay(arg1, arg2, kwarg1='x', kwarg2='y')

如果需要設(shè)置額外的行參數(shù)悦荒,必須用 apply_async

task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x'}, queue='q1')

啟動worker

celery -A app.celery worker -l info -Q default,q1 --concurrency=10 -n node1-worker-1@%%h

二唯欣、beat任務(wù)(定時(shí)任務(wù))

對于celery產(chǎn)生的定時(shí)任務(wù)如果放到一個(gè)隊(duì)列里,該任務(wù)被一個(gè)worker拿到后其他worker將獲取不到該任務(wù)搬味。這樣會產(chǎn)生一個(gè)現(xiàn)象即該任務(wù)只在一個(gè)節(jié)點(diǎn)執(zhí)行了黍聂,但業(yè)務(wù)上需要的是該任務(wù)在各個(gè)節(jié)點(diǎn)都執(zhí)行。
對此現(xiàn)象的解決方案是各節(jié)點(diǎn)需要定制各節(jié)點(diǎn)的定時(shí)任務(wù)并放到各自的隊(duì)列里身腻。對于任務(wù)a生成者會將其產(chǎn)生三份對下發(fā)到三個(gè)節(jié)點(diǎn)产还。對于任務(wù)b,node1并不需要執(zhí)行嘀趟,因此會產(chǎn)生兩份并將其下發(fā)到node2和node3上脐区。
這并不意味同一個(gè)任務(wù)需要編寫三份,任務(wù)編寫完后只需要將其注冊各節(jié)點(diǎn)對應(yīng)的配置里(需要自己實(shí)現(xiàn))她按。
為不影響其他異步任務(wù)執(zhí)行牛隅,beat將會由各節(jié)點(diǎn)單獨(dú)的worker進(jìn)行消費(fèi)。

產(chǎn)生beat任務(wù)

celery -A app.celery beat -l info

消費(fèi)beat任務(wù)

celery -A app.celery worker -l info -Q node1-crontab --concurrency=10 -n node1-worker-crontab@%%h

這里的node1-crontab為新的隊(duì)列酌泰,專門存放node1節(jié)點(diǎn)需要消費(fèi)的定時(shí)任務(wù)媒佣。

三、task是如何工作的

這里會說明為什么不同worker可以運(yùn)行不同版本的代碼陵刹,甚至生產(chǎn)者和消費(fèi)者之間也可以運(yùn)行不同版本的代碼默伍。

celery的任務(wù)是注冊在注冊表中,該表中注冊了任務(wù)名和任務(wù)類衰琐。說人話就是celery會在隊(duì)列中傳遞任務(wù)的模塊也糊,例如proj模塊中有一個(gè)task.py,該文件中編寫了一個(gè)叫add的異步任務(wù)(函數(shù))羡宙,那么celery傳遞的就是proj.task.add狸剃,只要保證消費(fèi)該任務(wù)的worker中有該模塊該文件該函數(shù)就行,worker并不關(guān)心該函數(shù)里是怎樣執(zhí)行的狗热,是否和生產(chǎn)者一致钞馁。

任務(wù)狀態(tài)

  • PENDING 任務(wù)正在等待執(zhí)行或未知虑省。任何未知的任務(wù) ID 都默認(rèn)處于掛起狀態(tài)。
  • STARTED 任務(wù)開始執(zhí)行
  • SUCCESS 任務(wù)執(zhí)行成功
  • FAILURE 任務(wù)執(zhí)行失敗
  • RETRY 任務(wù)處于重試狀態(tài)僧凰,這里指在task中捕獲到異常并顯式調(diào)用celery使其重試
  • REVOKED 任務(wù)被撤銷
@celery.task(bind=True)
def send_twitter_status(self, oauth, tweet):
    try:
        twitter = Twitter(oauth)
        twitter.update_status(tweet)
    except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
        raise self.retry(exc=exc)

2慷妙、celery監(jiān)控工具flower

Flower是基于web的監(jiān)控和管理celery工具

flower可以

  • 用Celery事件實(shí)時(shí)監(jiān)控,顯示任務(wù)的詳細(xì)信息允悦,圖形化和統(tǒng)計(jì)
  • 查看worker狀態(tài)和統(tǒng)計(jì),查看當(dāng)前正在運(yùn)行的tasks
  • Broker monitoring(中間人監(jiān)控)虑啤,查看所有Celery 隊(duì)列的統(tǒng)計(jì)隙弛,隊(duì)列長度圖

flower只需啟動在生產(chǎn)者端即可

截圖展示

image.png

四、進(jìn)程管理工具supervisor

粗略估計(jì)在node1上會啟后端服務(wù)狞山,celery worker三個(gè)全闷,定時(shí)任務(wù)消費(fèi)worker一個(gè),celery beat一個(gè)萍启,flower進(jìn)程总珠。這么多進(jìn)程用手工一個(gè)個(gè)啟動肯定要花費(fèi)大量時(shí)間,于是用supervisor管理這些進(jìn)程勘纯。
supervisor會已啟動自己子進(jìn)程的方式開啟進(jìn)程局服,可以對異常退出的進(jìn)程進(jìn)行重啟操作。
supervisor可以分為三個(gè)部分

  1. supervisord 服務(wù)端驳遵,主要負(fù)責(zé)啟動與管理進(jìn)程淫奔,響應(yīng)客戶端的請求
  2. supervisorctl 客戶端,提供一個(gè)命令行來使用supervisord提供的服務(wù)
  3. web界面 用來查看與管理子進(jìn)程

1堤结、子進(jìn)程配置

[program:worker]
command=celery -A app.celery worker -l info -Q default,q1 --concurrency=10 -n node1-worker-%(process_num)s@%%h   ; 啟動命令
process_name=%(program_name)s-%(process_num)d ; 進(jìn)程名
numprocs=3                    ; 進(jìn)程數(shù)量
directory=/Users/aaa/PycharmProjects/flask_test         ; 工作路徑
;umask=022                     ; umask for process (default None)
priority=999                  ; 優(yōu)先級唆迁。優(yōu)先級低,最先啟動竞穷,關(guān)閉的時(shí)候最后關(guān)閉
autostart=true                ; supervisor啟動后自動啟動
startsecs=1                   ; 啟動多少秒后是running認(rèn)為啟動成功
;startretries=3                ; 最大啟動重試次數(shù) (default 3)
autorestart=true               ; 子進(jìn)程掛掉自動重啟 (def: unexpected)
;exitcodes=0                   ; 'expected' exit codes used with autorestart (default 0)
stopsignal=TERM               ; 進(jìn)程停止信號唐责,停止celery worker時(shí)使用TERM, (TERM, HUP, INT, QUIT, KILL, USR1, or USR2)
stopwaitsecs=30               ; 等待停止最大時(shí)間,超過此時(shí)間會強(qiáng)制kill (default 10)
stopasgroup=true              ; 停掉子進(jìn)程的子進(jìn)程(保證不會出現(xiàn)孤兒進(jìn)程)
;killasgroup=true             ; kill進(jìn)程及其子進(jìn)程瘾带,直接發(fā)送KILL信號不會等待進(jìn)程退出
;user=chrism                   ; 管理子進(jìn)程的用戶
redirect_stderr=true          ; redirect 日志 stderr to stdout
stdout_logfile=/Users/aaa/PycharmProjects/flask_test/log/node1/celery-worker-1.log   ; 日志
stdout_logfile_maxbytes=50MB   ; 單個(gè)日志文件最大大小  (default 50MB)
stdout_logfile_backups=20     ; 日志文件數(shù)量  (default 10)
;stdout_capture_maxbytes=1MB   ; number of bytes in 'capturemode' (default 0)
;stdout_events_enabled=false   ; emit events on stdout writes (default false)
;stdout_syslog=false           ; send stdout to syslog with process name (default false)
;stderr_logfile=/Users/aaa/PycharmProjects/flask_test/log/default/celery_err.log       ; 錯(cuò)誤日志
;stderr_logfile_maxbytes=10MB   ; max # logfile bytes b4 rotation (default 50MB)
;stderr_logfile_backups=10     ; # of stderr logfile backups (0 means none, default 10)
;stderr_capture_maxbytes=1MB   ; number of bytes in 'capturemode' (default 0)
;stderr_events_enabled=false   ; emit events on stderpidr writes (default false)
;stderr_syslog=false           ; send stderr to syslog with process name (default false)
environment=PATH="/Users/aaa/anaconda3/envs/flask_test/bin"      ; 環(huán)境變量鼠哥,子進(jìn)程間不共享
;serverurl=AUTO                ; override serverurl computation (childutils)

supervisor配置文件放在supervisord.conf中

啟動supervisord

supervisord -c supervisord.conf

對于celery worker節(jié)點(diǎn)進(jìn)程退出信號使用TERM,TERM信號會使worker進(jìn)行熱關(guān)機(jī)看政,worker會將未消費(fèi)完的任務(wù)放回到隊(duì)列肴盏。

發(fā)現(xiàn)丟任務(wù)的情況:
假設(shè)worker1正在消費(fèi)任務(wù)3個(gè),worker2正在消費(fèi)任務(wù)4個(gè)帽衙。將worker1關(guān)機(jī)菜皂,3個(gè)任務(wù)會進(jìn)入到worker2,再將work2關(guān)機(jī)后打開worker3厉萝,這時(shí)會發(fā)現(xiàn)少了兩個(gè)任務(wù)恍飘。

丟任務(wù)的解決方案:
方案一:啟動一個(gè)worker然后將其關(guān)機(jī)后未消費(fèi)完的任務(wù)可以全部回到隊(duì)列榨崩,需要重啟時(shí)可以先將未有消費(fèi)任務(wù)的worker進(jìn)行重啟,然后再停掉正在消費(fèi)的worker章母∧钢耄或者只停掉一個(gè)worker。
方案二:supervisor進(jìn)程組的概念乳怎,直接將進(jìn)程組重啟

代碼更新后只需重啟子進(jìn)程彩郊,不需要重啟supervisord

進(jìn)程組配置

[group:celery-worker]
programs=worker  ;上面實(shí)例三個(gè)進(jìn)程會默認(rèn)分配到名為worker的進(jìn)程組,這里定義進(jìn)程組會覆蓋默認(rèn)的
priority=999                  ; the relative start priority (default 999)

對進(jìn)程組進(jìn)行操作等同于對進(jìn)程組下所有的進(jìn)程操作

對于進(jìn)程組操作在進(jìn)程組名后需要加上冒號即 celery-worker:

2蚪缀、進(jìn)程數(shù)說明

問題:celery啟動命令中已經(jīng)指定了 --concurrency=10 參數(shù)配置worker中開啟的進(jìn)程數(shù)量秫逝,為什么在supervisord中還要指定 numprocs=3 進(jìn)程數(shù)呢?

答:這兩個(gè)參數(shù)指定的進(jìn)程數(shù)量是不同的意義询枚。在celery中指定進(jìn)程數(shù)即意味著單個(gè)worker中可開啟的最大進(jìn)程數(shù)據(jù)量违帆。在supervisord指定的進(jìn)程數(shù)會直接開啟三個(gè)worker,相當(dāng)將定義的cmd執(zhí)行了三次金蜀。

supervisord中如果指定numprocs的同時(shí)也需要指定 process_name=%(program_name)s-%(process_num)d 刷后,原因在于如果多個(gè)進(jìn)程使用相同的進(jìn)程名會報(bào)錯(cuò),所以需要指定不同的進(jìn)程名渊抄。program_name[program:worker] 中定義的名字尝胆,即worker。process_num為進(jìn)程的序號护桦,從1開始班巩,注意它不是pid。

使用numprocs=3 創(chuàng)建的三個(gè)worker默認(rèn)會被放到一個(gè)名為worker(在哪里定義)的進(jìn)程組里嘶炭,如果在后面定義一個(gè)新的進(jìn)程組并將worker放進(jìn)去則這三個(gè)worker會的默認(rèn)進(jìn)程組會被替換為新的進(jìn)程組抱慌,同時(shí)新的進(jìn)程組里也可以放一個(gè)在其它program里定義的進(jìn)程。

[group:node1-celery-worker]
programs=worker,crontab-worker  # crontab-worker為在其它program里定義的進(jìn)程
priority=999 

3眨猎、supervisorctl命令

supervisorctl start ${program}     # 啟動進(jìn)程
supervisorctl stop ${program}      # 停止進(jìn)程
supervisorctl restart ${program}   # 重啟進(jìn)程
supervisorctl status ${program}    # 查看進(jìn)程狀態(tài)
supervisorctl update               # 重新載入配置文件
supervisorctl shutdown             # 關(guān)閉supervisord服務(wù)
supervisorctl reload               # 重啟supervisord服務(wù)
supervisorctl stop all             # 停止所有進(jìn)程

對于已經(jīng)配置好的supervisor并不需要進(jìn)行supervisord級別的重啟以及重新載入配置抑进。代碼更新后只需重啟子進(jìn)程即可加載最新代碼。

image.png

4睡陪、supervisor界面

將supervisorctl的命令可視化寺渗,可以直接點(diǎn)點(diǎn)點(diǎn),此外還可以展示子進(jìn)程的日志兰迫。實(shí)則感覺有supervisorctl就可以了


image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末信殊,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子汁果,更是在濱河造成了極大的恐慌涡拘,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,546評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件据德,死亡現(xiàn)場離奇詭異鳄乏,居然都是意外死亡跷车,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,224評論 3 395
  • 文/潘曉璐 我一進(jìn)店門橱野,熙熙樓的掌柜王于貴愁眉苦臉地迎上來朽缴,“玉大人,你說我怎么就攤上這事水援∶芮浚” “怎么了?”我有些...
    開封第一講書人閱讀 164,911評論 0 354
  • 文/不壞的土叔 我叫張陵蜗元,是天一觀的道長或渤。 經(jīng)常有香客問我,道長许帐,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,737評論 1 294
  • 正文 為了忘掉前任毕谴,我火速辦了婚禮成畦,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘涝开。我一直安慰自己循帐,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,753評論 6 392
  • 文/花漫 我一把揭開白布舀武。 她就那樣靜靜地躺著拄养,像睡著了一般。 火紅的嫁衣襯著肌膚如雪银舱。 梳的紋絲不亂的頭發(fā)上瘪匿,一...
    開封第一講書人閱讀 51,598評論 1 305
  • 那天,我揣著相機(jī)與錄音寻馏,去河邊找鬼棋弥。 笑死,一個(gè)胖子當(dāng)著我的面吹牛诚欠,可吹牛的內(nèi)容都是我干的顽染。 我是一名探鬼主播,決...
    沈念sama閱讀 40,338評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼轰绵,長吁一口氣:“原來是場噩夢啊……” “哼粉寞!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起左腔,我...
    開封第一講書人閱讀 39,249評論 0 276
  • 序言:老撾萬榮一對情侶失蹤唧垦,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后液样,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體业崖,經(jīng)...
    沈念sama閱讀 45,696評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡野芒,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,888評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了双炕。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片狞悲。...
    茶點(diǎn)故事閱讀 40,013評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖妇斤,靈堂內(nèi)的尸體忽然破棺而出摇锋,到底是詐尸還是另有隱情,我是刑警寧澤站超,帶...
    沈念sama閱讀 35,731評論 5 346
  • 正文 年R本政府宣布荸恕,位于F島的核電站,受9級特大地震影響死相,放射性物質(zhì)發(fā)生泄漏融求。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,348評論 3 330
  • 文/蒙蒙 一算撮、第九天 我趴在偏房一處隱蔽的房頂上張望生宛。 院中可真熱鬧,春花似錦肮柜、人聲如沸陷舅。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,929評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽莱睁。三九已至,卻和暖如春芒澜,著一層夾襖步出監(jiān)牢的瞬間仰剿,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,048評論 1 270
  • 我被黑心中介騙來泰國打工痴晦, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留酥馍,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,203評論 3 370
  • 正文 我出身青樓阅酪,卻偏偏與公主長得像旨袒,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子术辐,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,960評論 2 355