今天就來點比較有意思的東西
面前兩篇Celery部署爬蟲(一) Celery部署爬蟲(二) 充其量就是 Celery 的入門級弯汰,接下來就深入編寫 Celery 框架指厌,讓 Celery 更 加健壯楣嘁。
首先是定時任務型将,上文的編寫的定時任務是在 config 文件里進行配置的钞速,其實在Celery中提供了一系列的裝飾器,比如前面說的 @app.task 等等藕夫,來看看如何使用裝飾器來實現(xiàn) app 實例中的定時任務
# 定時任務在文件中的寫法
# -*- coding:utf-8 -*-
from Celery import app
from celery.schedules import crontab
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
# 通過beat組件周期性將任務發(fā)送給woker執(zhí)行孽糖。在示例中 新建文件period_task.py 并添加任務到配置文件中
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(10.0, add.s(1,3), name='1+3=') # 每10秒執(zhí)行add
sender.add_periodic_task(
crontab(hour=16, minute=56, day_of_week=1), #每周一下午四點五十六執(zhí)行sayhai
sayhi.s('wd'),name='say_hi'
)
也就是說通過方法調用的形式來執(zhí)行定時任務
一般而言,這種格式需要在配置 Beat 中無法達到邏輯需求的時候使用毅贮,比較少見办悟。
如果在 config 配置文件中編寫了定時任務并希望 Celery Worker 服務進程的同時啟動 Beat 模塊,需要加入 -B 參數(shù)
celery -A haha worker -B -l info
關于路由指定的隊列
一般是一個 task 對應一個隊列 各個隊列相互獨立 可以執(zhí)行不同的操作 比如taskA執(zhí)行即時任務滩褥,taskB執(zhí)行定時 延時任務等等 需要的是在 config 文件里聲明
CELERY_QUEUES = (
Queue("default",Exchange("default"),routing_key="default"),
Queue("for_add",Exchange("for_add"),routing_key="for_add"),
Queue("for_max",Exchange("for_max"),routing_key="for_max")
)
# 路由
CELERY_ROUTES = {
'tasks.add':{"queue":"for_add","routing_key":"for_add"},
'tasks.max':{"queue":"for_max","routing_key":"for_max"}
}
# 路由指明隊列和routing_key
# routing_key認證所在的隊列 用redis名稱要一致(對于redis來說)
指定 CELERY_QUEUES 和 CELERY_ROUTES 選擇要執(zhí)行的任務,這個玩意在分布式任務上還是很方便的病蛉,下面說。
自定義Task
如果在Celery中想自定義 Task 基類,那就需要繼承 Task 模塊
# coding=utf-8
from Celery import app
from celery.schedules import crontab
from celery.utils.log import get_task_logger
from celery import Task
logger = get_task_logger(__name__)
class demotask(Task): # 這是三種運行的狀態(tài)
def on_success(self, retval, task_id, args, kwargs): # 任務成功執(zhí)行
logger.info('task id:{} , arg:{} , successful !'.format(task_id,args))
def on_failure(self, exc, task_id, args, kwargs, einfo): #任務失敗執(zhí)行
logger.info('task id:{} , arg:{} , failed ! error : {}' .format(task_id,args,exc))
def on_retry(self, exc, task_id, args, kwargs, einfo): #任務重試執(zhí)行
logger.info('task id:{} , arg:{} , retry ! info: {}'.format(task_id, args, exc))
@app.task(base=demotask) # 需要繼承這個基類
def func1(x,y):
try:
a=[]
a[10] =1
except Exception as e:
logger.info(e)
return x+y
@app.task(base=demotask)
def func2(name):
a=[]
a[10] =1
return 'hi {}'.format(name)
@app.task(base=demotask)
def func3(a,b):
return 'result = {} '.format(a+b)
這算是復寫了 Task 模塊铺然,定義了幾種狀態(tài)俗孝,這個可以根據(jù)需求,只要在 task 中聲明要繼承的自定義類即可魄健。
如果想要深入了解 Task 基類赋铝,可以查看 Celery 源碼,這就不廢話了沽瘦。
PS:當使用多個裝飾器裝飾任務函數(shù)時革骨,確保 task 裝飾器最后應用(在python中,這意味它必須在第一個位置)
@app.task
@decorator2
@decorator1
def add(x, y):
return x + y
另外在Celery中還提供了自帶的 logger 日志模塊析恋,可以這么用
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task
def add(x, y):
logger.info('Adding {0} + {1}'.format(x, y))
return x + y
Celery的可視化監(jiān)控
Celery Flower 是一款Celery官方推薦使用的監(jiān)控工具
能夠實時監(jiān)控 celery 的 Worker Tasks Borker Result等服務
安裝也是極其簡單
pip install flower
運行服務
celery flower --broker=redis://localhost:6379 --address=127.0.0.1 --port=5555
或者
flower -A proj --broker=redis://localhost:6379
這樣就可以訪問 Flower Web 了苛蒲,瀏覽器訪問 http: //127.0.0.1:5555 查看運行的服務
這樣一來,在 Celery部署爬蟲(一)的百度百科的爬蟲就可以重新編寫了
Config.py文件
# config.py
from __future__ import absolute_import
# broker
import datetime
BROKER_URL = 'redis://127.0.0.1:6379/0'
# backen
CELERY_RESULT_BACKEND = 'mongodb://127.0.0.1:27017'
# 導入任務绿满,如tasks.py
CELERY_IMPORTS = ('task', )
# 列化任務載荷的默認的序列化方式
CELERY_TASK_SERIALIZER = 'json'
# 結果序列化方式
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE='Asia/Shanghai' # 指定時區(qū)臂外,不指定默認為 'UTC'
# CELERY_TIMEZONE='UTC'
# CELERY_ENABLE_UTC = TrueE='UTC'# CELERY_ENABLE_UTC = True
Celery.py文件
# Celery.py
# coding=utf-8
from __future__ import absolute_import
from celery import Celery
app = Celery("STQ")
# 加載配置模塊
app.config_from_object('config')
if __name__ == '__main__':
app.start()
Task.py 文件
# coding=utf-8
from __future__ import absolute_import
import re
import requests
from lxml import etree
import urllib.request
from Celery import app
headers ={"User-Agent":"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36"}
def get_html(url):
r = urllib.request.Request(url,headers=headers)
res = urllib.request.urlopen(r)
return res.read().decode("utf-8")
@app.task
def parse(html):
ret = re.compile(r'<title>(.*?)</title>')
title = re.findall(ret,html)
return title
Run_task.py 文件
from __future__ import absolute_import
from tasks import get_html,parse
with open("url.txt","r") as f:
for data in f.readlines():
url = data.strip('\n')
html = get_html(url)
result = parse.delay(html)
黑窗口鍵入
celery -A Celery worker -l info -P gevent -c 10
然后發(fā)布任務讓它從消息隊列中消費
python run_task.py
這樣一來。所有的參數(shù)配置都可以在 config 文件中進行配置了
關于路由 隊列的使用
CELERY_QUEUES 設置一個指定 routing_key 的隊列
CELERY_QUEUES = (
Queue("default",Exchange("default"),routing_key="default"),
Queue("for_add",Exchange("for_add"),routing_key="for_add"),
)
CELERY_ROUTES 設置路由
通過 routing_key 來關聯(lián)上面的 CELERY_QUEUES喇颁,所以這里的 routing_key 需要和上面參數(shù)的一致漏健。
# 路由
CELERY_ROUTES = {
'tasks.add':{"queue":"for_add","routing_key":"for_add"},
}
tasks.add 表示在 tasks.py 的函數(shù)文件的 add 方法屬于哪個路由,而這個路由關聯(lián)著哪個隊列橘霎。也就是說聲明哪個任務放入哪個隊列
如果開啟了這個路由蔫浆,那么此任務就會被執(zhí)行,反之則不會姐叁。
那么如果沒有指定隊列的方法要怎么執(zhí)行呢瓦盛?
celery -A Celery worker -l info -n add -Q for_add -P gevent -c 10
可以在命令中去指定某個任務對應某個隊列,所以外潜,發(fā)布出去的任務會通過路由關聯(lián)到指定隊列原环,不同 worker 會從不同的隊列來消費任務。
而且每個隊列都是相互獨立的处窥,這樣一來嘱吗,每個任務之間就不會相互影響了,即時任務滔驾,定時任務就可以有明確的分工谒麦。
分布式集群
接下來就該說說 Celery 分布式,基本的架構就是通過中間件來共享消息隊列哆致。
事實上分布式架構的核心無非就是機器間的通信绕德,而一般的分布式爬蟲架構都會體現(xiàn)在共享數(shù)據(jù)庫隊列,Redis 和 RabbitMQ 就是典型的消息隊列摊阀。
比如通過redis來配置共享中間件
redis://ip:6379/0
比如現(xiàn)在有 Master, Slave1, Slave2三臺主機坛梁,使用Master編寫主配置文件,celery主文件炸客,任務和調度文件坟乾。
然后拷貝兩份分別放到兩臺Slave里去,在黑窗口中分別鍵入
celery -A Celery worker -l info
可以看到,在終端中服務器之間已經(jīng)相互連通
然后發(fā)布任務
python run_haha.py
本來監(jiān)聽就監(jiān)聽著 redis隊列 的機器就會進行任務消費
需要知道的一點就是,運行隊列里的任務的時候,每個機器真正調用的是自身任務文件里面的任務函數(shù)枝缔,只要該函數(shù)存在于隊列之中。
然后就和一般的分布式?jīng)]啥兩樣了蚊惯。
好了愿卸,今天就這樣了
但是,我在想截型,是不是應該來點刺激的
歡迎轉載趴荸,但要聲明出處,不然我順著網(wǎng)線過去就是一拳宦焦。
個人技術博客:http://www.gzky.live