介紹
在本方案中月洛,使用celery作為任務(wù)分發(fā)平臺(tái)爱谁。對(duì)于存入celery的大量任務(wù),能達(dá)到以下的要求:
- 任務(wù)邏輯相互獨(dú)立
- 橫向擴(kuò)展任務(wù)處理能力
- 抽象(抽象的意義在于脓规,化繁為簡(jiǎn)) 除業(yè)務(wù)邏輯以外的 處理過(guò)程晓猛,將后續(xù)代碼編寫(xiě)的關(guān)注點(diǎn)主要放在業(yè)務(wù)邏輯的實(shí)現(xiàn)上
- 鏈?zhǔn)饺蝿?wù)觸發(fā)
結(jié)構(gòu)
設(shè)計(jì)方案
任務(wù)邏輯封裝
我們將任務(wù)邏輯按照約定的格式封裝饿幅,并以設(shè)置name屬性的方式為任務(wù)打上標(biāo)記,而后通過(guò)current_node/default_node確定app 發(fā)送的節(jié)點(diǎn)戒职。這些準(zhǔn)備工作完成后栗恩,將任務(wù)名稱(chēng)和任務(wù)數(shù)據(jù)交由celery app進(jìn)行發(fā)送。如此能夠保持發(fā)送端的輕量級(jí)洪燥,使任務(wù)更快磕秤、更穩(wěn)定、更無(wú)壓力的發(fā)送到執(zhí)行端捧韵。
以下是任務(wù)的封裝功能偽代碼市咆。
class BaseLogic(object):
logic_name = None
nodes = []
def __init__(self):
if not self.logic_name or not self.nodes:
raise attributeError
def next_node(current_node=None): # 邏輯具有了修改下一個(gè)執(zhí)行節(jié)點(diǎn)的能力。
def send_other(self, other_logic_name):
def send_others(self, other_logic_names):
def set(self, key, value):
def get(self, key, default=None):
def die(self):
def __repr__(self):
class SpiderLogic(BaseLogic):
def crawl(self):
def publish(self):
def store(self):
發(fā)送器封裝
class App():
def __init__(self, app->celery.app):
self.app = app
def send(self, node_name, data=None): # 發(fā)送初始任務(wù)再来。發(fā)送任務(wù)名稱(chēng)和傳遞的數(shù)據(jù)(LogicClass.name, data)
任務(wù)處理流程簡(jiǎn)述
不同節(jié)點(diǎn)的worker會(huì)收到屬于本節(jié)點(diǎn)的任務(wù)蒙兰。worker提供任務(wù)的執(zhí)行流程。
worker是無(wú)狀態(tài)的芒篷,worker的每次運(yùn)行會(huì)傳入此次運(yùn)行所需的數(shù)據(jù)搜变,多次任務(wù)運(yùn)行之間相互不會(huì)產(chǎn)生影響。對(duì)于無(wú)狀態(tài)的系統(tǒng)针炉,可以避免考慮數(shù)據(jù)同步等額外的交互問(wèn)題挠他。同時(shí)根據(jù)任務(wù)數(shù)量級(jí)和任務(wù)執(zhí)行所需要的資源的不同,可以對(duì)worker進(jìn)行橫向擴(kuò)展糊识。
基礎(chǔ)task處理邏輯的封裝功能偽代碼:
class ProcessTask(celery.Task):
"""
worker中實(shí)際運(yùn)行的任務(wù)流程封裝
"""
name = None
def __init__(self):
def run(self, data):
logic_ins = logic_factory(data)
self._run(logic_ins)
self.send_next(logic_ins)
self.send_others(logic_ins)
def _run(self, logic_ins):
raise NotImplementedError
def send_next(self, logic_ins):
node_name = logic_ins.next_nodes.pop()
self.send_to_node(node_name)
def send_others(self, logic_ins):
for node_name in logic_ins.other_nodes:
self.send_to_node(node_name)
def send_to_node(self, node_name):
class CrawlProcessTask(ProcessTask):
name = 'crawl'
def _run(self, logic_ins):
logic_ins.crawl()
收到任務(wù)的名稱(chēng)以后绩社,worker會(huì)通過(guò) 工廠方法 根據(jù)任務(wù)名稱(chēng)實(shí)例化對(duì)應(yīng)的類(lèi)摔蓝,并且按照worker既定的執(zhí)行流程赂苗,執(zhí)行對(duì)應(yīng)的業(yè)務(wù)邏輯。
執(zhí)行完成后贮尉,按照l(shuí)ogic class 既定的順序拌滋,自動(dòng)觸發(fā)下一個(gè)流程。當(dāng)然如果需要將數(shù)據(jù)進(jìn)行鏈?zhǔn)教幚聿卵瑁敲丛谶壿嬵?lèi)中败砂,通過(guò)定義other_logics赌渣,數(shù)據(jù)也會(huì)發(fā)送到對(duì)應(yīng)節(jié)點(diǎn)開(kāi)始新的流程。
總結(jié)
任務(wù)邏輯相互獨(dú)立的意義在于昌犹,當(dāng)一個(gè)任務(wù)需要調(diào)整邏輯時(shí)坚芜,會(huì)自然而然的將修改鎖定在獨(dú)立的代碼塊中,也就是最小化此次修改的影響范圍斜姥。所以我們將任務(wù)相互獨(dú)立的抽象成不同的邏輯類(lèi)鸿竖。而當(dāng)任務(wù)相互獨(dú)立以后,我們需要一個(gè)統(tǒng)一的任務(wù)運(yùn)行機(jī)制铸敏,并且此機(jī)制希望對(duì)于任務(wù)毫無(wú)干預(yù)缚忧,也就是機(jī)制不關(guān)注運(yùn)行任務(wù)的內(nèi)容是什么,而是關(guān)注運(yùn)行任務(wù)的流程杈笔。所以我們對(duì)于任務(wù)運(yùn)行設(shè)計(jì)了一套流程闪水。
在流程中,我們將任務(wù)封裝成為一個(gè)個(gè)的類(lèi)蒙具,在類(lèi)中定義好業(yè)務(wù)邏輯以及處理節(jié)點(diǎn)的順序球榆。類(lèi)通過(guò)一個(gè)統(tǒng)一的入口進(jìn)入流程處理。同時(shí)禁筏,通過(guò)這個(gè)順序芜果,被封裝后的celery app 可以找到首個(gè)接收的worker 節(jié)點(diǎn),然后通過(guò)celery的分布式任務(wù)分發(fā)能力融师,進(jìn)行任務(wù)的分發(fā)右钾。
同時(shí)在類(lèi)中設(shè)置鏈?zhǔn)教幚磉壿嫞獬龁蝹€(gè)任務(wù)之間的壁壘旱爆,將任務(wù)鏈條串起來(lái)舀射,解決任務(wù)之間數(shù)據(jù)交互的問(wèn)題。
故此怀伦,我們將業(yè)務(wù)代碼抽離出任務(wù)分發(fā)流程脆烟,任務(wù)相互獨(dú)立,同時(shí)提供數(shù)據(jù)傳遞的方案房待,保證任務(wù)流程的正常執(zhí)行邢羔。同時(shí)通過(guò)worker節(jié)點(diǎn)的無(wú)狀態(tài),以及celery節(jié)點(diǎn)的擴(kuò)容能力桑孩,使得當(dāng)有大量任務(wù)產(chǎn)生的時(shí)候拜鹤,能夠?qū)θ我馊蝿?wù)節(jié)點(diǎn)數(shù)量進(jìn)行橫向擴(kuò)展。